From e653ebff32e8e9fcb497a26e4af8888024790464 Mon Sep 17 00:00:00 2001 From: Anqi Date: Fri, 5 Jan 2024 16:45:00 +0800 Subject: [PATCH 1/2] refactor handshakeKey --- .../scala/com/vesoft/exchange/common/config/Configs.scala | 6 +++--- .../vesoft/exchange/common/processor/ReloadProcessor.scala | 2 +- .../vesoft/nebula/exchange/processor/EdgeProcessor.scala | 4 ++-- .../nebula/exchange/processor/VerticesProcessor.scala | 4 ++-- .../src/main/resources/application.conf | 4 ++-- .../vesoft/nebula/exchange/processor/EdgeProcessor.scala | 4 ++-- .../nebula/exchange/processor/VerticesProcessor.scala | 4 ++-- .../vesoft/nebula/exchange/processor/EdgeProcessor.scala | 4 ++-- .../nebula/exchange/processor/VerticesProcessor.scala | 4 ++-- 9 files changed, 18 insertions(+), 18 deletions(-) diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala index 68d33ed..3a1413a 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala @@ -51,7 +51,7 @@ object WriteMode extends Enumeration { case class DataBaseConfigEntry(graphAddress: List[String], space: String, metaAddresses: List[String], - version: String) { + handshakeKey: String) { require(graphAddress.nonEmpty, "nebula.address.graph cannot be empty") require(metaAddresses.nonEmpty, "nebula.address.meta cannot be empty") require(space.trim.nonEmpty, "nebula.space cannot be empty") @@ -313,8 +313,8 @@ object Configs { val metaAddresses = nebulaConfig.getStringList("address.meta").asScala.toList val space = nebulaConfig.getString("space") - val version = getStringOrNull(nebulaConfig, "version") - val databaseEntry = DataBaseConfigEntry(addresses, space, metaAddresses, version) + val handshakeKey = getStringOrNull(nebulaConfig, "handshakeKey") + val databaseEntry = DataBaseConfigEntry(addresses, space, metaAddresses, handshakeKey) val enableTagless = getOrElse(nebulaConfig, "enableTagless", false) LOG.info(s"DataBase Config ${databaseEntry}") diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala index 62ac4ef..3ad05d4 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala @@ -35,7 +35,7 @@ class ReloadProcessor(data: DataFrame, new GraphProvider(config.databaseConfig.getGraphAddress, config.connectionConfig.timeout, config.sslConfig, - config.databaseConfig.version) + config.databaseConfig.handshakeKey) val writer = new NebulaGraphClientWriter(config.databaseConfig, config.userConfig, diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 050cf05..793d2e5 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -57,7 +57,7 @@ class EdgeProcessor(spark: SparkSession, new GraphProvider(config.databaseConfig.getGraphAddress, config.connectionConfig.timeout, config.sslConfig, - config.databaseConfig.version) + config.databaseConfig.handshakeKey) val writer = new NebulaGraphClientWriter(config.databaseConfig, config.userConfig, config.rateConfig, @@ -104,7 +104,7 @@ class EdgeProcessor(spark: SparkSession, val timeout = config.connectionConfig.timeout val retry = config.connectionConfig.retry val metaProvider = - new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.version) + new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey) val fieldTypeMap = NebulaUtils.getDataSourceFieldType(edgeConfig, space, metaProvider) val isVidStringType = metaProvider.getVidType(space) == VidType.STRING val partitionNum = metaProvider.getPartNumber(space) diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index c8cf285..e9bf3c1 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -62,7 +62,7 @@ class VerticesProcessor(spark: SparkSession, new GraphProvider(config.databaseConfig.getGraphAddress, config.connectionConfig.timeout, config.sslConfig, - config.databaseConfig.version) + config.databaseConfig.handshakeKey) val writer = new NebulaGraphClientWriter(config.databaseConfig, config.userConfig, @@ -110,7 +110,7 @@ class VerticesProcessor(spark: SparkSession, val timeout = config.connectionConfig.timeout val retry = config.connectionConfig.retry val metaProvider = - new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.version) + new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey) val fieldTypeMap = NebulaUtils.getDataSourceFieldType(tagConfig, space, metaProvider) val isVidStringType = metaProvider.getVidType(space) == VidType.STRING val partitionNum = metaProvider.getPartNumber(space) diff --git a/nebula-exchange_spark_2.4/src/main/resources/application.conf b/nebula-exchange_spark_2.4/src/main/resources/application.conf index 7fec30c..db0bf8b 100644 --- a/nebula-exchange_spark_2.4/src/main/resources/application.conf +++ b/nebula-exchange_spark_2.4/src/main/resources/application.conf @@ -27,8 +27,8 @@ user: root pswd: nebula space: test - # the version decided by NebulaGraph server. - version: 3.0.0 + # the handshakeKey for client by NebulaGraph server. + handshakeKey: 3.0.0 # if com.vesoft.exchange.common.config graph ssl encrypted transmission ssl:{ diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 95d996f..cac6b9b 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -58,7 +58,7 @@ class EdgeProcessor(spark: SparkSession, new GraphProvider(config.databaseConfig.getGraphAddress, config.connectionConfig.timeout, config.sslConfig, - config.databaseConfig.version) + config.databaseConfig.handshakeKey) val writer = new NebulaGraphClientWriter(config.databaseConfig, config.userConfig, config.rateConfig, @@ -105,7 +105,7 @@ class EdgeProcessor(spark: SparkSession, val timeout = config.connectionConfig.timeout val retry = config.connectionConfig.retry val metaProvider = - new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.version) + new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey) val fieldTypeMap = NebulaUtils.getDataSourceFieldType(edgeConfig, space, metaProvider) val isVidStringType = metaProvider.getVidType(space) == VidType.STRING val partitionNum = metaProvider.getPartNumber(space) diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index 3763fd8..763206d 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -64,7 +64,7 @@ class VerticesProcessor(spark: SparkSession, new GraphProvider(config.databaseConfig.getGraphAddress, config.connectionConfig.timeout, config.sslConfig, - config.databaseConfig.version) + config.databaseConfig.handshakeKey) val writer = new NebulaGraphClientWriter(config.databaseConfig, config.userConfig, @@ -112,7 +112,7 @@ class VerticesProcessor(spark: SparkSession, val timeout = config.connectionConfig.timeout val retry = config.connectionConfig.retry val metaProvider = - new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.version) + new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey) val fieldTypeMap = NebulaUtils.getDataSourceFieldType(tagConfig, space, metaProvider) val isVidStringType = metaProvider.getVidType(space) == VidType.STRING val partitionNum = metaProvider.getPartNumber(space) diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 9f0ea6b..332ed92 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -58,7 +58,7 @@ class EdgeProcessor(spark: SparkSession, new GraphProvider(config.databaseConfig.getGraphAddress, config.connectionConfig.timeout, config.sslConfig, - config.databaseConfig.version) + config.databaseConfig.handshakeKey) val writer = new NebulaGraphClientWriter(config.databaseConfig, config.userConfig, config.rateConfig, @@ -106,7 +106,7 @@ class EdgeProcessor(spark: SparkSession, val timeout = config.connectionConfig.timeout val retry = config.connectionConfig.retry val metaProvider = - new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.version) + new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey) val fieldTypeMap = NebulaUtils.getDataSourceFieldType(edgeConfig, space, metaProvider) val isVidStringType = metaProvider.getVidType(space) == VidType.STRING val partitionNum = metaProvider.getPartNumber(space) diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index fbd336d..802cfe4 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -64,7 +64,7 @@ class VerticesProcessor(spark: SparkSession, new GraphProvider(config.databaseConfig.getGraphAddress, config.connectionConfig.timeout, config.sslConfig, - config.databaseConfig.version) + config.databaseConfig.handshakeKey) val writer = new NebulaGraphClientWriter(config.databaseConfig, config.userConfig, @@ -112,7 +112,7 @@ class VerticesProcessor(spark: SparkSession, val timeout = config.connectionConfig.timeout val retry = config.connectionConfig.retry val metaProvider = - new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.version) + new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey) val fieldTypeMap = NebulaUtils.getDataSourceFieldType(tagConfig, space, metaProvider) val isVidStringType = metaProvider.getVidType(space) == VidType.STRING val partitionNum = metaProvider.getPartNumber(space) From 5bd408d95b117a09e946fde444d5f678deae2130 Mon Sep 17 00:00:00 2001 From: Anqi Date: Fri, 5 Jan 2024 18:01:50 +0800 Subject: [PATCH 2/2] update --- .../main/scala/com/vesoft/exchange/common/GraphProvider.scala | 4 ++-- .../main/scala/com/vesoft/exchange/common/MetaProvider.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/GraphProvider.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/GraphProvider.scala index 6249087..6dd9514 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/GraphProvider.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/GraphProvider.scala @@ -27,7 +27,7 @@ import scala.collection.mutable.ListBuffer class GraphProvider(addresses: List[HostAddress], timeout: Int, sslConfigEntry: SslConfigEntry, - version: String) + handshakeKey: String) extends AutoCloseable with Serializable { private[this] lazy val LOG = Logger.getLogger(this.getClass) @@ -37,7 +37,7 @@ class GraphProvider(addresses: List[HostAddress], val randAddr = scala.util.Random.shuffle(addresses) nebulaPoolConfig.setTimeout(timeout) - nebulaPoolConfig.setVersion(version) + nebulaPoolConfig.setHandshakeKey(handshakeKey) // com.vesoft.exchange.common.config graph ssl nebulaPoolConfig.setEnableSsl(sslConfigEntry.enableGraph) diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/MetaProvider.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/MetaProvider.scala index ab72a2e..03dd002 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/MetaProvider.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/MetaProvider.scala @@ -29,7 +29,7 @@ class MetaProvider(addresses: List[HostAddress], timeout: Int, retry: Int, sslConfigEntry: SslConfigEntry, - version: String) + handshakeKey: String) extends AutoCloseable with Serializable { private[this] lazy val LOG = Logger.getLogger(this.getClass) @@ -50,7 +50,7 @@ class MetaProvider(addresses: List[HostAddress], metaClient = new MetaClient(addresses.asJava, timeout, retry, retry) } - metaClient.setVersion(version) + metaClient.setHandshakeKey(handshakeKey) metaClient.connect() def getPartNumber(space: String): Int = {