Skip to content

Commit

Permalink
remove version key (#194)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 authored Jan 12, 2024
1 parent c391cae commit 8b9a662
Show file tree
Hide file tree
Showing 15 changed files with 24 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import scala.collection.mutable.ListBuffer
*/
class GraphProvider(addresses: List[HostAddress],
timeout: Int,
sslConfigEntry: SslConfigEntry,
handshakeKey: String)
sslConfigEntry: SslConfigEntry)
extends AutoCloseable
with Serializable {
private[this] lazy val LOG = Logger.getLogger(this.getClass)
Expand All @@ -37,7 +36,6 @@ class GraphProvider(addresses: List[HostAddress],
val randAddr = scala.util.Random.shuffle(addresses)

nebulaPoolConfig.setTimeout(timeout)
nebulaPoolConfig.setHandshakeKey(handshakeKey)

// com.vesoft.exchange.common.config graph ssl
nebulaPoolConfig.setEnableSsl(sslConfigEntry.enableGraph)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ import scala.collection.mutable.ListBuffer
class MetaProvider(addresses: List[HostAddress],
timeout: Int,
retry: Int,
sslConfigEntry: SslConfigEntry,
handshakeKey: String)
sslConfigEntry: SslConfigEntry)
extends AutoCloseable
with Serializable {
private[this] lazy val LOG = Logger.getLogger(this.getClass)
Expand All @@ -50,7 +49,6 @@ class MetaProvider(addresses: List[HostAddress],
metaClient = new MetaClient(addresses.asJava, timeout, retry, retry)
}

metaClient.setHandshakeKey(handshakeKey)
metaClient.connect()

def getPartNumber(space: String): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ object WriteMode extends Enumeration {
*/
case class DataBaseConfigEntry(graphAddress: List[String],
space: String,
metaAddresses: List[String],
handshakeKey: String) {
metaAddresses: List[String]) {
require(graphAddress.nonEmpty, "nebula.address.graph cannot be empty")
require(metaAddresses.nonEmpty, "nebula.address.meta cannot be empty")
require(space.trim.nonEmpty, "nebula.space cannot be empty")
Expand Down Expand Up @@ -313,8 +312,7 @@ object Configs {
val metaAddresses = nebulaConfig.getStringList("address.meta").asScala.toList

val space = nebulaConfig.getString("space")
val handshakeKey = getStringOrNull(nebulaConfig, "handshakeKey")
val databaseEntry = DataBaseConfigEntry(addresses, space, metaAddresses, handshakeKey)
val databaseEntry = DataBaseConfigEntry(addresses, space, metaAddresses)
val enableTagless = getOrElse(nebulaConfig, "enableTagless", false)
LOG.info(s"DataBase Config ${databaseEntry}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ class ReloadProcessor(data: DataFrame,
val graphProvider =
new GraphProvider(config.databaseConfig.getGraphAddress,
config.connectionConfig.timeout,
config.sslConfig,
config.databaseConfig.handshakeKey)
config.sslConfig)

val writer = new NebulaGraphClientWriter(config.databaseConfig,
config.userConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class GraphProviderSuite {

val sslConfig = SslConfigEntry(false, false, SslType.CA, null, null)
graphProvider =
new GraphProvider(List(new HostAddress("127.0.0.1", 9669)), 5000, sslConfig, null)
new GraphProvider(List(new HostAddress("127.0.0.1", 9669)), 5000, sslConfig)
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class MetaProviderSuite {
mockData.close()

val sslConfig = SslConfigEntry(false, false, SslType.CA, null, null)
metaProvider = new MetaProvider(List(new HostAddress("127.0.0.1", 9559)), 5000, 1, sslConfig, null)
metaProvider = new MetaProvider(List(new HostAddress("127.0.0.1", 9559)), 5000, 1, sslConfig)
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ class ConfigsSuite {
val graphAddress = List("127.0.0.1:9669", "127.0.0.1:9670")
val metaAddress = List("127.0.0.1:9559", "127.0.0.1:9560")
val space = "test"
DataBaseConfigEntry(graphAddress, space, metaAddress, null)
DataBaseConfigEntry(graphAddress, space, metaAddress)
}

/**
Expand All @@ -351,7 +351,7 @@ class ConfigsSuite {
val graphAddress = List("127.0.0.1:9669", "127.0.0.1:9670")
val metaAddress = List("127.0.0.1:9559", "127.0.0.1:9560")
assertThrows[IllegalArgumentException] {
DataBaseConfigEntry(graphAddress, "", metaAddress, null)
DataBaseConfigEntry(graphAddress, "", metaAddress)
}
}

Expand All @@ -365,7 +365,7 @@ class ConfigsSuite {
val metaAddress = List("127.0.0.1:9559", "127.0.0.1:9560")

assertThrows[IllegalArgumentException] {
DataBaseConfigEntry(wrongGraphAddress, space, metaAddress, null)
DataBaseConfigEntry(wrongGraphAddress, space, metaAddress)
}
}

Expand All @@ -378,7 +378,7 @@ class ConfigsSuite {
val space = "test"
val wrongMetaAddress = List("127.0.0.1:9559,127.0.0.1:9560")
assertThrows[IllegalArgumentException] {
DataBaseConfigEntry(graphAddress, space, wrongMetaAddress, null)
DataBaseConfigEntry(graphAddress, space, wrongMetaAddress)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class NebulaUtilsSuite {
val address = new ListBuffer[HostAddress]()
address.append(new HostAddress("127.0.0.1", 9559))
val sslConfig = SslConfigEntry(false, false, null, null, null)
val metaProvider = new MetaProvider(address.toList, 6000, 1, sslConfig, null)
val metaProvider = new MetaProvider(address.toList, 6000, 1, sslConfig)

val map: Map[String, Int] =
NebulaUtils.getDataSourceFieldType(sourceConfig, space, metaProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ class EdgeProcessor(spark: SparkSession,
val graphProvider =
new GraphProvider(config.databaseConfig.getGraphAddress,
config.connectionConfig.timeout,
config.sslConfig,
config.databaseConfig.handshakeKey)
config.sslConfig)
val writer = new NebulaGraphClientWriter(config.databaseConfig,
config.userConfig,
config.rateConfig,
Expand Down Expand Up @@ -104,7 +103,7 @@ class EdgeProcessor(spark: SparkSession,
val timeout = config.connectionConfig.timeout
val retry = config.connectionConfig.retry
val metaProvider =
new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey)
new MetaProvider(address, timeout, retry, config.sslConfig)
val fieldTypeMap = NebulaUtils.getDataSourceFieldType(edgeConfig, space, metaProvider)
val isVidStringType = metaProvider.getVidType(space) == VidType.STRING
val partitionNum = metaProvider.getPartNumber(space)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ class VerticesProcessor(spark: SparkSession,
val graphProvider =
new GraphProvider(config.databaseConfig.getGraphAddress,
config.connectionConfig.timeout,
config.sslConfig,
config.databaseConfig.handshakeKey)
config.sslConfig)

val writer = new NebulaGraphClientWriter(config.databaseConfig,
config.userConfig,
Expand Down Expand Up @@ -110,7 +109,7 @@ class VerticesProcessor(spark: SparkSession,
val timeout = config.connectionConfig.timeout
val retry = config.connectionConfig.retry
val metaProvider =
new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey)
new MetaProvider(address, timeout, retry, config.sslConfig)
val fieldTypeMap = NebulaUtils.getDataSourceFieldType(tagConfig, space, metaProvider)
val isVidStringType = metaProvider.getVidType(space) == VidType.STRING
val partitionNum = metaProvider.getPartNumber(space)
Expand Down
2 changes: 0 additions & 2 deletions nebula-exchange_spark_2.4/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
user: root
pswd: nebula
space: test
# the handshakeKey for client by NebulaGraph server.
handshakeKey: 3.0.0

# if com.vesoft.exchange.common.config graph ssl encrypted transmission
ssl:{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ class EdgeProcessor(spark: SparkSession,
val graphProvider =
new GraphProvider(config.databaseConfig.getGraphAddress,
config.connectionConfig.timeout,
config.sslConfig,
config.databaseConfig.handshakeKey)
config.sslConfig)
val writer = new NebulaGraphClientWriter(config.databaseConfig,
config.userConfig,
config.rateConfig,
Expand Down Expand Up @@ -105,7 +104,7 @@ class EdgeProcessor(spark: SparkSession,
val timeout = config.connectionConfig.timeout
val retry = config.connectionConfig.retry
val metaProvider =
new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey)
new MetaProvider(address, timeout, retry, config.sslConfig)
val fieldTypeMap = NebulaUtils.getDataSourceFieldType(edgeConfig, space, metaProvider)
val isVidStringType = metaProvider.getVidType(space) == VidType.STRING
val partitionNum = metaProvider.getPartNumber(space)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ class VerticesProcessor(spark: SparkSession,
val graphProvider =
new GraphProvider(config.databaseConfig.getGraphAddress,
config.connectionConfig.timeout,
config.sslConfig,
config.databaseConfig.handshakeKey)
config.sslConfig)

val writer = new NebulaGraphClientWriter(config.databaseConfig,
config.userConfig,
Expand Down Expand Up @@ -112,7 +111,7 @@ class VerticesProcessor(spark: SparkSession,
val timeout = config.connectionConfig.timeout
val retry = config.connectionConfig.retry
val metaProvider =
new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey)
new MetaProvider(address, timeout, retry, config.sslConfig)
val fieldTypeMap = NebulaUtils.getDataSourceFieldType(tagConfig, space, metaProvider)
val isVidStringType = metaProvider.getVidType(space) == VidType.STRING
val partitionNum = metaProvider.getPartNumber(space)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ class EdgeProcessor(spark: SparkSession,
val graphProvider =
new GraphProvider(config.databaseConfig.getGraphAddress,
config.connectionConfig.timeout,
config.sslConfig,
config.databaseConfig.handshakeKey)
config.sslConfig)
val writer = new NebulaGraphClientWriter(config.databaseConfig,
config.userConfig,
config.rateConfig,
Expand Down Expand Up @@ -106,7 +105,7 @@ class EdgeProcessor(spark: SparkSession,
val timeout = config.connectionConfig.timeout
val retry = config.connectionConfig.retry
val metaProvider =
new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey)
new MetaProvider(address, timeout, retry, config.sslConfig)
val fieldTypeMap = NebulaUtils.getDataSourceFieldType(edgeConfig, space, metaProvider)
val isVidStringType = metaProvider.getVidType(space) == VidType.STRING
val partitionNum = metaProvider.getPartNumber(space)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ class VerticesProcessor(spark: SparkSession,
val graphProvider =
new GraphProvider(config.databaseConfig.getGraphAddress,
config.connectionConfig.timeout,
config.sslConfig,
config.databaseConfig.handshakeKey)
config.sslConfig)

val writer = new NebulaGraphClientWriter(config.databaseConfig,
config.userConfig,
Expand Down Expand Up @@ -112,7 +111,7 @@ class VerticesProcessor(spark: SparkSession,
val timeout = config.connectionConfig.timeout
val retry = config.connectionConfig.retry
val metaProvider =
new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey)
new MetaProvider(address, timeout, retry, config.sslConfig)
val fieldTypeMap = NebulaUtils.getDataSourceFieldType(tagConfig, space, metaProvider)
val isVidStringType = metaProvider.getVidType(space) == VidType.STRING
val partitionNum = metaProvider.getPartNumber(space)
Expand Down

0 comments on commit 8b9a662

Please sign in to comment.