Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor handshakeKey #193

Merged
merged 2 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.collection.mutable.ListBuffer
class GraphProvider(addresses: List[HostAddress],
timeout: Int,
sslConfigEntry: SslConfigEntry,
version: String)
handshakeKey: String)
extends AutoCloseable
with Serializable {
private[this] lazy val LOG = Logger.getLogger(this.getClass)
Expand All @@ -37,7 +37,7 @@ class GraphProvider(addresses: List[HostAddress],
val randAddr = scala.util.Random.shuffle(addresses)

nebulaPoolConfig.setTimeout(timeout)
nebulaPoolConfig.setVersion(version)
nebulaPoolConfig.setHandshakeKey(handshakeKey)

// com.vesoft.exchange.common.config graph ssl
nebulaPoolConfig.setEnableSsl(sslConfigEntry.enableGraph)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class MetaProvider(addresses: List[HostAddress],
timeout: Int,
retry: Int,
sslConfigEntry: SslConfigEntry,
version: String)
handshakeKey: String)
extends AutoCloseable
with Serializable {
private[this] lazy val LOG = Logger.getLogger(this.getClass)
Expand All @@ -50,7 +50,7 @@ class MetaProvider(addresses: List[HostAddress],
metaClient = new MetaClient(addresses.asJava, timeout, retry, retry)
}

metaClient.setVersion(version)
metaClient.setHandshakeKey(handshakeKey)
metaClient.connect()

def getPartNumber(space: String): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object WriteMode extends Enumeration {
case class DataBaseConfigEntry(graphAddress: List[String],
space: String,
metaAddresses: List[String],
version: String) {
handshakeKey: String) {
require(graphAddress.nonEmpty, "nebula.address.graph cannot be empty")
require(metaAddresses.nonEmpty, "nebula.address.meta cannot be empty")
require(space.trim.nonEmpty, "nebula.space cannot be empty")
Expand Down Expand Up @@ -313,8 +313,8 @@ object Configs {
val metaAddresses = nebulaConfig.getStringList("address.meta").asScala.toList

val space = nebulaConfig.getString("space")
val version = getStringOrNull(nebulaConfig, "version")
val databaseEntry = DataBaseConfigEntry(addresses, space, metaAddresses, version)
val handshakeKey = getStringOrNull(nebulaConfig, "handshakeKey")
val databaseEntry = DataBaseConfigEntry(addresses, space, metaAddresses, handshakeKey)
val enableTagless = getOrElse(nebulaConfig, "enableTagless", false)
LOG.info(s"DataBase Config ${databaseEntry}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ReloadProcessor(data: DataFrame,
new GraphProvider(config.databaseConfig.getGraphAddress,
config.connectionConfig.timeout,
config.sslConfig,
config.databaseConfig.version)
config.databaseConfig.handshakeKey)

val writer = new NebulaGraphClientWriter(config.databaseConfig,
config.userConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class EdgeProcessor(spark: SparkSession,
new GraphProvider(config.databaseConfig.getGraphAddress,
config.connectionConfig.timeout,
config.sslConfig,
config.databaseConfig.version)
config.databaseConfig.handshakeKey)
val writer = new NebulaGraphClientWriter(config.databaseConfig,
config.userConfig,
config.rateConfig,
Expand Down Expand Up @@ -104,7 +104,7 @@ class EdgeProcessor(spark: SparkSession,
val timeout = config.connectionConfig.timeout
val retry = config.connectionConfig.retry
val metaProvider =
new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.version)
new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey)
val fieldTypeMap = NebulaUtils.getDataSourceFieldType(edgeConfig, space, metaProvider)
val isVidStringType = metaProvider.getVidType(space) == VidType.STRING
val partitionNum = metaProvider.getPartNumber(space)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class VerticesProcessor(spark: SparkSession,
new GraphProvider(config.databaseConfig.getGraphAddress,
config.connectionConfig.timeout,
config.sslConfig,
config.databaseConfig.version)
config.databaseConfig.handshakeKey)

val writer = new NebulaGraphClientWriter(config.databaseConfig,
config.userConfig,
Expand Down Expand Up @@ -110,7 +110,7 @@ class VerticesProcessor(spark: SparkSession,
val timeout = config.connectionConfig.timeout
val retry = config.connectionConfig.retry
val metaProvider =
new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.version)
new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey)
val fieldTypeMap = NebulaUtils.getDataSourceFieldType(tagConfig, space, metaProvider)
val isVidStringType = metaProvider.getVidType(space) == VidType.STRING
val partitionNum = metaProvider.getPartNumber(space)
Expand Down
4 changes: 2 additions & 2 deletions nebula-exchange_spark_2.4/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
user: root
pswd: nebula
space: test
# the version decided by NebulaGraph server.
version: 3.0.0
# the handshakeKey for client by NebulaGraph server.
handshakeKey: 3.0.0

# if com.vesoft.exchange.common.config graph ssl encrypted transmission
ssl:{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class EdgeProcessor(spark: SparkSession,
new GraphProvider(config.databaseConfig.getGraphAddress,
config.connectionConfig.timeout,
config.sslConfig,
config.databaseConfig.version)
config.databaseConfig.handshakeKey)
val writer = new NebulaGraphClientWriter(config.databaseConfig,
config.userConfig,
config.rateConfig,
Expand Down Expand Up @@ -105,7 +105,7 @@ class EdgeProcessor(spark: SparkSession,
val timeout = config.connectionConfig.timeout
val retry = config.connectionConfig.retry
val metaProvider =
new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.version)
new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey)
val fieldTypeMap = NebulaUtils.getDataSourceFieldType(edgeConfig, space, metaProvider)
val isVidStringType = metaProvider.getVidType(space) == VidType.STRING
val partitionNum = metaProvider.getPartNumber(space)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class VerticesProcessor(spark: SparkSession,
new GraphProvider(config.databaseConfig.getGraphAddress,
config.connectionConfig.timeout,
config.sslConfig,
config.databaseConfig.version)
config.databaseConfig.handshakeKey)

val writer = new NebulaGraphClientWriter(config.databaseConfig,
config.userConfig,
Expand Down Expand Up @@ -112,7 +112,7 @@ class VerticesProcessor(spark: SparkSession,
val timeout = config.connectionConfig.timeout
val retry = config.connectionConfig.retry
val metaProvider =
new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.version)
new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey)
val fieldTypeMap = NebulaUtils.getDataSourceFieldType(tagConfig, space, metaProvider)
val isVidStringType = metaProvider.getVidType(space) == VidType.STRING
val partitionNum = metaProvider.getPartNumber(space)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class EdgeProcessor(spark: SparkSession,
new GraphProvider(config.databaseConfig.getGraphAddress,
config.connectionConfig.timeout,
config.sslConfig,
config.databaseConfig.version)
config.databaseConfig.handshakeKey)
val writer = new NebulaGraphClientWriter(config.databaseConfig,
config.userConfig,
config.rateConfig,
Expand Down Expand Up @@ -106,7 +106,7 @@ class EdgeProcessor(spark: SparkSession,
val timeout = config.connectionConfig.timeout
val retry = config.connectionConfig.retry
val metaProvider =
new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.version)
new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey)
val fieldTypeMap = NebulaUtils.getDataSourceFieldType(edgeConfig, space, metaProvider)
val isVidStringType = metaProvider.getVidType(space) == VidType.STRING
val partitionNum = metaProvider.getPartNumber(space)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class VerticesProcessor(spark: SparkSession,
new GraphProvider(config.databaseConfig.getGraphAddress,
config.connectionConfig.timeout,
config.sslConfig,
config.databaseConfig.version)
config.databaseConfig.handshakeKey)

val writer = new NebulaGraphClientWriter(config.databaseConfig,
config.userConfig,
Expand Down Expand Up @@ -112,7 +112,7 @@ class VerticesProcessor(spark: SparkSession,
val timeout = config.connectionConfig.timeout
val retry = config.connectionConfig.retry
val metaProvider =
new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.version)
new MetaProvider(address, timeout, retry, config.sslConfig, config.databaseConfig.handshakeKey)
val fieldTypeMap = NebulaUtils.getDataSourceFieldType(tagConfig, space, metaProvider)
val isVidStringType = metaProvider.getVidType(space) == VidType.STRING
val partitionNum = metaProvider.getPartNumber(space)
Expand Down
Loading