diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/GraphProvider.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/GraphProvider.scala index 6dd9514a..70d251d3 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/GraphProvider.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/GraphProvider.scala @@ -26,8 +26,7 @@ import scala.collection.mutable.ListBuffer */ class GraphProvider(addresses: List[HostAddress], timeout: Int, - sslConfigEntry: SslConfigEntry, - handshakeKey: String) + sslConfigEntry: SslConfigEntry) extends AutoCloseable with Serializable { private[this] lazy val LOG = Logger.getLogger(this.getClass) @@ -37,7 +36,6 @@ class GraphProvider(addresses: List[HostAddress], val randAddr = scala.util.Random.shuffle(addresses) nebulaPoolConfig.setTimeout(timeout) - nebulaPoolConfig.setHandshakeKey(handshakeKey) // com.vesoft.exchange.common.config graph ssl nebulaPoolConfig.setEnableSsl(sslConfigEntry.enableGraph) diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/MetaProvider.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/MetaProvider.scala index 03dd0027..b0e851dd 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/MetaProvider.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/MetaProvider.scala @@ -28,8 +28,7 @@ import scala.collection.mutable.ListBuffer class MetaProvider(addresses: List[HostAddress], timeout: Int, retry: Int, - sslConfigEntry: SslConfigEntry, - handshakeKey: String) + sslConfigEntry: SslConfigEntry) extends AutoCloseable with Serializable { private[this] lazy val LOG = Logger.getLogger(this.getClass) @@ -50,7 +49,6 @@ class MetaProvider(addresses: List[HostAddress], metaClient = new MetaClient(addresses.asJava, timeout, retry, retry) } - metaClient.setHandshakeKey(handshakeKey) metaClient.connect() def getPartNumber(space: String): Int = { diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala index 3a1413ad..7819c452 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala @@ -50,8 +50,7 @@ object WriteMode extends Enumeration { */ case class DataBaseConfigEntry(graphAddress: List[String], space: String, - metaAddresses: List[String], - handshakeKey: String) { + metaAddresses: List[String]) { require(graphAddress.nonEmpty, "nebula.address.graph cannot be empty") require(metaAddresses.nonEmpty, "nebula.address.meta cannot be empty") require(space.trim.nonEmpty, "nebula.space cannot be empty") @@ -313,8 +312,7 @@ object Configs { val metaAddresses = nebulaConfig.getStringList("address.meta").asScala.toList val space = nebulaConfig.getString("space") - val handshakeKey = getStringOrNull(nebulaConfig, "handshakeKey") - val databaseEntry = DataBaseConfigEntry(addresses, space, metaAddresses, handshakeKey) + val databaseEntry = DataBaseConfigEntry(addresses, space, metaAddresses) val enableTagless = getOrElse(nebulaConfig, "enableTagless", false) LOG.info(s"DataBase Config ${databaseEntry}") diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala index 3ad05d4b..0ec546bb 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala @@ -34,8 +34,7 @@ class ReloadProcessor(data: DataFrame, val graphProvider = new GraphProvider(config.databaseConfig.getGraphAddress, config.connectionConfig.timeout, - config.sslConfig, - config.databaseConfig.handshakeKey) + config.sslConfig) val writer = new NebulaGraphClientWriter(config.databaseConfig, config.userConfig, diff --git a/exchange-common/src/test/scala/com/vesoft/exchange/common/GraphProviderSuite.scala b/exchange-common/src/test/scala/com/vesoft/exchange/common/GraphProviderSuite.scala index e22b8b81..552f3f9a 100644 --- a/exchange-common/src/test/scala/com/vesoft/exchange/common/GraphProviderSuite.scala +++ b/exchange-common/src/test/scala/com/vesoft/exchange/common/GraphProviderSuite.scala @@ -27,7 +27,7 @@ class GraphProviderSuite { val sslConfig = SslConfigEntry(false, false, SslType.CA, null, null) graphProvider = - new GraphProvider(List(new HostAddress("127.0.0.1", 9669)), 5000, sslConfig, null) + new GraphProvider(List(new HostAddress("127.0.0.1", 9669)), 5000, sslConfig) } @After diff --git a/exchange-common/src/test/scala/com/vesoft/exchange/common/MetaProviderSuite.scala b/exchange-common/src/test/scala/com/vesoft/exchange/common/MetaProviderSuite.scala index e1b9a433..8f723f60 100644 --- a/exchange-common/src/test/scala/com/vesoft/exchange/common/MetaProviderSuite.scala +++ b/exchange-common/src/test/scala/com/vesoft/exchange/common/MetaProviderSuite.scala @@ -23,7 +23,7 @@ class MetaProviderSuite { mockData.close() val sslConfig = SslConfigEntry(false, false, SslType.CA, null, null) - metaProvider = new MetaProvider(List(new HostAddress("127.0.0.1", 9559)), 5000, 1, sslConfig, null) + metaProvider = new MetaProvider(List(new HostAddress("127.0.0.1", 9559)), 5000, 1, sslConfig) } @After diff --git a/exchange-common/src/test/scala/com/vesoft/exchange/common/config/ConfigsSuite.scala b/exchange-common/src/test/scala/com/vesoft/exchange/common/config/ConfigsSuite.scala index 884d0997..e2cf6a8a 100644 --- a/exchange-common/src/test/scala/com/vesoft/exchange/common/config/ConfigsSuite.scala +++ b/exchange-common/src/test/scala/com/vesoft/exchange/common/config/ConfigsSuite.scala @@ -340,7 +340,7 @@ class ConfigsSuite { val graphAddress = List("127.0.0.1:9669", "127.0.0.1:9670") val metaAddress = List("127.0.0.1:9559", "127.0.0.1:9560") val space = "test" - DataBaseConfigEntry(graphAddress, space, metaAddress, null) + DataBaseConfigEntry(graphAddress, space, metaAddress) } /** @@ -351,7 +351,7 @@ class ConfigsSuite { val graphAddress = List("127.0.0.1:9669", "127.0.0.1:9670") val metaAddress = List("127.0.0.1:9559", "127.0.0.1:9560") assertThrows[IllegalArgumentException] { - DataBaseConfigEntry(graphAddress, "", metaAddress, null) + DataBaseConfigEntry(graphAddress, "", metaAddress) } } @@ -365,7 +365,7 @@ class ConfigsSuite { val metaAddress = List("127.0.0.1:9559", "127.0.0.1:9560") assertThrows[IllegalArgumentException] { - DataBaseConfigEntry(wrongGraphAddress, space, metaAddress, null) + DataBaseConfigEntry(wrongGraphAddress, space, metaAddress) } } @@ -378,7 +378,7 @@ class ConfigsSuite { val space = "test" val wrongMetaAddress = List("127.0.0.1:9559,127.0.0.1:9560") assertThrows[IllegalArgumentException] { - DataBaseConfigEntry(graphAddress, space, wrongMetaAddress, null) + DataBaseConfigEntry(graphAddress, space, wrongMetaAddress) } } diff --git a/exchange-common/src/test/scala/com/vesoft/exchange/common/utils/NebulaUtilsSuite.scala b/exchange-common/src/test/scala/com/vesoft/exchange/common/utils/NebulaUtilsSuite.scala index 2e19d5b0..20bcd2ec 100644 --- a/exchange-common/src/test/scala/com/vesoft/exchange/common/utils/NebulaUtilsSuite.scala +++ b/exchange-common/src/test/scala/com/vesoft/exchange/common/utils/NebulaUtilsSuite.scala @@ -93,7 +93,7 @@ class NebulaUtilsSuite { val address = new ListBuffer[HostAddress]() address.append(new HostAddress("127.0.0.1", 9559)) val sslConfig = SslConfigEntry(false, false, null, null, null) - val metaProvider = new MetaProvider(address.toList, 6000, 1, sslConfig, null) + val metaProvider = new MetaProvider(address.toList, 6000, 1, sslConfig) val map: Map[String, Int] = NebulaUtils.getDataSourceFieldType(sourceConfig, space, metaProvider) diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 793d2e54..50e5f6b6 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -56,8 +56,7 @@ class EdgeProcessor(spark: SparkSession, val graphProvider = new GraphProvider(config.databaseConfig.getGraphAddress, config.connectionConfig.timeout, - config.sslConfig, - config.databaseConfig.handshakeKey) + config.sslConfig) val writer = new NebulaGraphClientWriter(config.databaseConfig, config.userConfig, config.rateConfig, @@ -104,7 +103,7 @@ class EdgeProcessor(spark: SparkSession, val timeout = config.connectionConfig.timeout val retry = config.connectionConfig.retry val metaProvider = - new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey) + new MetaProvider(address, timeout, retry, config.sslConfig) val fieldTypeMap = NebulaUtils.getDataSourceFieldType(edgeConfig, space, metaProvider) val isVidStringType = metaProvider.getVidType(space) == VidType.STRING val partitionNum = metaProvider.getPartNumber(space) diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index e9bf3c11..41c745f5 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -61,8 +61,7 @@ class VerticesProcessor(spark: SparkSession, val graphProvider = new GraphProvider(config.databaseConfig.getGraphAddress, config.connectionConfig.timeout, - config.sslConfig, - config.databaseConfig.handshakeKey) + config.sslConfig) val writer = new NebulaGraphClientWriter(config.databaseConfig, config.userConfig, @@ -110,7 +109,7 @@ class VerticesProcessor(spark: SparkSession, val timeout = config.connectionConfig.timeout val retry = config.connectionConfig.retry val metaProvider = - new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey) + new MetaProvider(address, timeout, retry, config.sslConfig) val fieldTypeMap = NebulaUtils.getDataSourceFieldType(tagConfig, space, metaProvider) val isVidStringType = metaProvider.getVidType(space) == VidType.STRING val partitionNum = metaProvider.getPartNumber(space) diff --git a/nebula-exchange_spark_2.4/src/main/resources/application.conf b/nebula-exchange_spark_2.4/src/main/resources/application.conf index db0bf8b6..848063be 100644 --- a/nebula-exchange_spark_2.4/src/main/resources/application.conf +++ b/nebula-exchange_spark_2.4/src/main/resources/application.conf @@ -27,8 +27,6 @@ user: root pswd: nebula space: test - # the handshakeKey for client by NebulaGraph server. - handshakeKey: 3.0.0 # if com.vesoft.exchange.common.config graph ssl encrypted transmission ssl:{ diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index cac6b9b7..35c08cb7 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -57,8 +57,7 @@ class EdgeProcessor(spark: SparkSession, val graphProvider = new GraphProvider(config.databaseConfig.getGraphAddress, config.connectionConfig.timeout, - config.sslConfig, - config.databaseConfig.handshakeKey) + config.sslConfig) val writer = new NebulaGraphClientWriter(config.databaseConfig, config.userConfig, config.rateConfig, @@ -105,7 +104,7 @@ class EdgeProcessor(spark: SparkSession, val timeout = config.connectionConfig.timeout val retry = config.connectionConfig.retry val metaProvider = - new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey) + new MetaProvider(address, timeout, retry, config.sslConfig) val fieldTypeMap = NebulaUtils.getDataSourceFieldType(edgeConfig, space, metaProvider) val isVidStringType = metaProvider.getVidType(space) == VidType.STRING val partitionNum = metaProvider.getPartNumber(space) diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index 763206dc..a79f6950 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -63,8 +63,7 @@ class VerticesProcessor(spark: SparkSession, val graphProvider = new GraphProvider(config.databaseConfig.getGraphAddress, config.connectionConfig.timeout, - config.sslConfig, - config.databaseConfig.handshakeKey) + config.sslConfig) val writer = new NebulaGraphClientWriter(config.databaseConfig, config.userConfig, @@ -112,7 +111,7 @@ class VerticesProcessor(spark: SparkSession, val timeout = config.connectionConfig.timeout val retry = config.connectionConfig.retry val metaProvider = - new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey) + new MetaProvider(address, timeout, retry, config.sslConfig) val fieldTypeMap = NebulaUtils.getDataSourceFieldType(tagConfig, space, metaProvider) val isVidStringType = metaProvider.getVidType(space) == VidType.STRING val partitionNum = metaProvider.getPartNumber(space) diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 332ed92e..25a811ad 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -57,8 +57,7 @@ class EdgeProcessor(spark: SparkSession, val graphProvider = new GraphProvider(config.databaseConfig.getGraphAddress, config.connectionConfig.timeout, - config.sslConfig, - config.databaseConfig.handshakeKey) + config.sslConfig) val writer = new NebulaGraphClientWriter(config.databaseConfig, config.userConfig, config.rateConfig, @@ -106,7 +105,7 @@ class EdgeProcessor(spark: SparkSession, val timeout = config.connectionConfig.timeout val retry = config.connectionConfig.retry val metaProvider = - new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey) + new MetaProvider(address, timeout, retry, config.sslConfig) val fieldTypeMap = NebulaUtils.getDataSourceFieldType(edgeConfig, space, metaProvider) val isVidStringType = metaProvider.getVidType(space) == VidType.STRING val partitionNum = metaProvider.getPartNumber(space) diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index 802cfe40..6d375d4a 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -63,8 +63,7 @@ class VerticesProcessor(spark: SparkSession, val graphProvider = new GraphProvider(config.databaseConfig.getGraphAddress, config.connectionConfig.timeout, - config.sslConfig, - config.databaseConfig.handshakeKey) + config.sslConfig) val writer = new NebulaGraphClientWriter(config.databaseConfig, config.userConfig, @@ -112,7 +111,7 @@ class VerticesProcessor(spark: SparkSession, val timeout = config.connectionConfig.timeout val retry = config.connectionConfig.retry val metaProvider = - new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey) + new MetaProvider(address, timeout, retry, config.sslConfig) val fieldTypeMap = NebulaUtils.getDataSourceFieldType(tagConfig, space, metaProvider) val isVidStringType = metaProvider.getVidType(space) == VidType.STRING val partitionNum = metaProvider.getPartNumber(space)