Skip to content

Commit

Permalink
Added new config admin.client.api.enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
mroiter-larus committed Dec 30, 2021
1 parent a6a14fe commit fe9465c
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 27 deletions.
5 changes: 5 additions & 0 deletions common/src/main/kotlin/streams/config/StreamsConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe
const val POLL_INTERVAL = "streams.sink.poll.interval"
const val INSTANCE_WAIT_TIMEOUT = "streams.wait.timeout"
const val INSTANCE_WAIT_TIMEOUT_VALUE = 120000L
const val KAFKA_ADMIN_CLIENT_API_ENABLED = "kafka.admin.client.api.enabled"
const val KAFKA_ADMIN_CLIENT_API_ENABLED_VALUE = true

private const val DEFAULT_TRIGGER_PERIOD: Int = 10000

Expand Down Expand Up @@ -83,6 +85,8 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe
fun getSystemDbWaitTimeout(config: Map<String, Any?>) = config.getOrDefault(SYSTEM_DB_WAIT_TIMEOUT, SYSTEM_DB_WAIT_TIMEOUT_VALUE).toString().toLong()

fun getInstanceWaitTimeout(config: Map<String, Any?>) = config.getOrDefault(INSTANCE_WAIT_TIMEOUT, INSTANCE_WAIT_TIMEOUT_VALUE).toString().toLong()

fun isKafkaAdminClientApiEnabled(config: Map<String, Any?>) = config.getOrDefault(KAFKA_ADMIN_CLIENT_API_ENABLED, KAFKA_ADMIN_CLIENT_API_ENABLED_VALUE).toString().toBoolean()
}

private val configLifecycle: ConfigurationLifecycle
Expand Down Expand Up @@ -221,4 +225,5 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe

fun getInstanceWaitTimeout() = Companion.getInstanceWaitTimeout(getConfiguration())

fun isKafkaAdminClientApiEnabled() = Companion.isKafkaAdminClientApiEnabled(getConfiguration())
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ data class KafkaSinkConfiguration(val bootstrapServers: String = "localhost:9092
val streamsSinkConfiguration: StreamsSinkConfiguration = StreamsSinkConfiguration(),
val enableAutoCommit: Boolean = true,
val streamsAsyncCommit: Boolean = false,
val extraProperties: Map<String, String> = emptyMap()) {
val extraProperties: Map<String, String> = emptyMap(),
val adminClientApiEnabled: Boolean = StreamsConfig.KAFKA_ADMIN_CLIENT_API_ENABLED_VALUE) {

companion object {

fun from(cfg: Map<String, String>, dbName: String, isDefaultDb: Boolean): KafkaSinkConfiguration {
val kafkaCfg = create(cfg, dbName, isDefaultDb)
validate(kafkaCfg)
val invalidTopics = getInvalidTopics(kafkaCfg.asProperties(), kafkaCfg.streamsSinkConfiguration.topics.allTopics())
val invalidTopics = if (kafkaCfg.adminClientApiEnabled) getInvalidTopics(kafkaCfg.asProperties(), kafkaCfg.streamsSinkConfiguration.topics.allTopics()) else emptyList()
return if (invalidTopics.isNotEmpty()) {
kafkaCfg.copy(streamsSinkConfiguration = StreamsSinkConfiguration.from(cfg, dbName, invalidTopics, isDefaultDb))
} else {
Expand All @@ -65,7 +66,6 @@ data class KafkaSinkConfiguration(val bootstrapServers: String = "localhost:9092

val streamsSinkConfiguration = StreamsSinkConfiguration.from(configMap = cfg, dbName = dbName, isDefaultDb = isDefaultDb)


return default.copy(keyDeserializer = config.getOrDefault(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, default.keyDeserializer),
valueDeserializer = config.getOrDefault(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, default.valueDeserializer),
bootstrapServers = config.getOrDefault(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, default.bootstrapServers),
Expand All @@ -74,7 +74,8 @@ data class KafkaSinkConfiguration(val bootstrapServers: String = "localhost:9092
enableAutoCommit = config.getOrDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, default.enableAutoCommit).toString().toBoolean(),
streamsAsyncCommit = config.getOrDefault("streams.async.commit", default.streamsAsyncCommit).toString().toBoolean(),
streamsSinkConfiguration = streamsSinkConfiguration,
extraProperties = extraProperties // for what we don't provide a default configuration
extraProperties = extraProperties,
adminClientApiEnabled = config.getOrDefault("admin.client.api.enabled", default.adminClientApiEnabled).toString().toBoolean()// for what we don't provide a default configuration
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import streams.StreamsSinkConfigurationTest
import streams.config.StreamsConfig
import streams.service.TopicValidationException
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue

class KafkaSinkConfigurationTest {
Expand All @@ -37,6 +38,7 @@ class KafkaSinkConfigurationTest {
assertEquals(true, default.enableAutoCommit)
assertEquals(false, default.streamsAsyncCommit)
assertEquals(emptyMap(), default.extraProperties)
assertTrue { default.adminClientApiEnabled }
}

@Test
Expand All @@ -48,28 +50,32 @@ class KafkaSinkConfigurationTest {
val group = "foo"
val autoOffsetReset = "latest"
val autoCommit = "false"
val kafkaAdminClientApiEnabled = "false"
val config = mapOf(topicKey to topicValue,
"kafka.bootstrap.servers" to bootstrap,
"kafka.auto.offset.reset" to autoOffsetReset,
"kafka.enable.auto.commit" to autoCommit,
"kafka.group.id" to group,
"kafka.streams.async.commit" to "true",
"kafka.key.deserializer" to ByteArrayDeserializer::class.java.name,
"kafka.value.deserializer" to KafkaAvroDeserializer::class.java.name)
"kafka.value.deserializer" to KafkaAvroDeserializer::class.java.name,
"kafka.admin.client.api.enabled" to kafkaAdminClientApiEnabled)
val expectedMap = mapOf("bootstrap.servers" to bootstrap,
"auto.offset.reset" to autoOffsetReset, "enable.auto.commit" to autoCommit, "group.id" to group,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.toString(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.toString(),
"streams.async.commit" to "true",
"key.deserializer" to ByteArrayDeserializer::class.java.name,
"value.deserializer" to KafkaAvroDeserializer::class.java.name)
"value.deserializer" to KafkaAvroDeserializer::class.java.name,
"admin.client.api.enabled" to kafkaAdminClientApiEnabled)

val kafkaSinkConfiguration = KafkaSinkConfiguration.create(config, defaultDbName, isDefaultDb = true)
StreamsSinkConfigurationTest.testFromConf(kafkaSinkConfiguration.streamsSinkConfiguration, topic, topicValue)
assertEquals(emptyMap(), kafkaSinkConfiguration.extraProperties)
assertEquals(bootstrap, kafkaSinkConfiguration.bootstrapServers)
assertEquals(autoOffsetReset, kafkaSinkConfiguration.autoOffsetReset)
assertEquals(group, kafkaSinkConfiguration.groupId)
assertFalse { kafkaSinkConfiguration.adminClientApiEnabled }
val resultMap = kafkaSinkConfiguration
.asProperties()
.map { it.key.toString() to it.value.toString() }
Expand All @@ -94,6 +100,7 @@ class KafkaSinkConfigurationTest {
val autoOffsetReset = "latest"
val autoCommit = "false"
val asyncCommit = "true"
val kafkaAdminClientApiEnabled = "false"
val config = mapOf(topicKey to topicValue,
topicKeyFoo to topicValueFoo,
"kafka.bootstrap.servers" to bootstrap,
Expand All @@ -102,14 +109,16 @@ class KafkaSinkConfigurationTest {
"kafka.group.id" to group,
"kafka.streams.async.commit" to asyncCommit,
"kafka.key.deserializer" to ByteArrayDeserializer::class.java.name,
"kafka.value.deserializer" to KafkaAvroDeserializer::class.java.name)
"kafka.value.deserializer" to KafkaAvroDeserializer::class.java.name,
"kafka.admin.client.api.enabled" to kafkaAdminClientApiEnabled)
val expectedMap = mapOf("bootstrap.servers" to bootstrap,
"auto.offset.reset" to autoOffsetReset, "enable.auto.commit" to autoCommit, "group.id" to "$group-$dbName",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.toString(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.toString(),
"key.deserializer" to ByteArrayDeserializer::class.java.name,
"streams.async.commit" to asyncCommit,
"value.deserializer" to KafkaAvroDeserializer::class.java.name)
"value.deserializer" to KafkaAvroDeserializer::class.java.name,
"admin.client.api.enabled" to kafkaAdminClientApiEnabled)

val kafkaSinkConfiguration = KafkaSinkConfiguration.create(config, dbName, isDefaultDb = false)
StreamsSinkConfigurationTest.testFromConf(kafkaSinkConfiguration.streamsSinkConfiguration, topic, topicValueFoo)
Expand Down Expand Up @@ -179,5 +188,4 @@ class KafkaSinkConfigurationTest {
throw e
}
}

}
21 changes: 21 additions & 0 deletions producer/src/main/kotlin/streams/kafka/AdminService.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package streams.kafka

import org.neo4j.logging.Log

interface AdminService {

fun start()

fun stop()

fun isValidTopic(topic: String): Boolean

fun getInvalidTopics(): List<String>

companion object{
fun getInstance(props: KafkaConfiguration, allTopics: List<String>, log: Log): AdminService = when (props.adminClientApiEnabled) {
true -> KafkaAdminService(props, allTopics, log)
else -> DefaultAdminService(log)
}
}
}
17 changes: 17 additions & 0 deletions producer/src/main/kotlin/streams/kafka/DefaultAdminService.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package streams.kafka

import org.neo4j.logging.Log

class DefaultAdminService(private val log: Log) : AdminService {

override fun start() {
log.info("No need to start the AdminService to check the topic list. We'll consider the topic's auto creation enabled")
}

override fun stop() {} // Do nothing

override fun isValidTopic(topic: String): Boolean = true

override fun getInvalidTopics(): List<String> = emptyList()

}
10 changes: 5 additions & 5 deletions producer/src/main/kotlin/streams/kafka/KafkaAdminService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import streams.utils.StreamsUtils
import java.util.Collections
import java.util.concurrent.ConcurrentHashMap

class KafkaAdminService(private val props: KafkaConfiguration, private val allTopics: List<String>, private val log: Log) {
class KafkaAdminService(private val props: KafkaConfiguration, private val allTopics: List<String>, private val log: Log) : AdminService {
private val client = AdminClient.create(props.asProperties())
private val kafkaTopics: MutableSet<String> = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
private val isAutoCreateTopicsEnabled = KafkaValidationUtils.isAutoCreateTopicsEnabled(client)
private lateinit var job: Job

fun start() {
override fun start() {
if (!isAutoCreateTopicsEnabled) {
job = GlobalScope.launch(Dispatchers.IO) {
while (isActive) {
Expand All @@ -39,18 +39,18 @@ class KafkaAdminService(private val props: KafkaConfiguration, private val allTo
}
}

fun stop() {
override fun stop() {
StreamsUtils.ignoreExceptions({
runBlocking {
job.cancelAndJoin()
}
}, UninitializedPropertyAccessException::class.java)
}

fun isValidTopic(topic: String) = when (isAutoCreateTopicsEnabled) {
override fun isValidTopic(topic: String) = when (isAutoCreateTopicsEnabled) {
true -> true
else -> kafkaTopics.contains(topic)
}

fun getInvalidTopics() = KafkaValidationUtils.getInvalidTopics(client, allTopics)
override fun getInvalidTopics() = KafkaValidationUtils.getInvalidTopics(client, allTopics)
}
10 changes: 8 additions & 2 deletions producer/src/main/kotlin/streams/kafka/KafkaConfiguration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.neo4j.logging.Log
import streams.config.StreamsConfig
import streams.extensions.getInt
import streams.extensions.toPointCase
import streams.utils.JSONUtils
Expand All @@ -30,7 +31,8 @@ data class KafkaConfiguration(val bootstrapServers: String = "localhost:9092",
val lingerMs: Int = 1,
val topicDiscoveryPollingInterval: Long = TimeUnit.MINUTES.toMillis(5),
val streamsLogCompactionStrategy: String = LogStrategy.delete.toString(),
val extraProperties: Map<String, String> = emptyMap()) {
val extraProperties: Map<String, String> = emptyMap(),
val adminClientApiEnabled: Boolean = StreamsConfig.KAFKA_ADMIN_CLIENT_API_ENABLED_VALUE) {

companion object {
// Visible for testing
Expand All @@ -56,7 +58,11 @@ data class KafkaConfiguration(val bootstrapServers: String = "localhost:9092",
topicDiscoveryPollingInterval = config.getOrDefault("topic.discovery.polling.interval",
default.topicDiscoveryPollingInterval).toString().toLong(),
streamsLogCompactionStrategy = config.getOrDefault("streams.log.compaction.strategy", default.streamsLogCompactionStrategy),
extraProperties = extraProperties // for what we don't provide a default configuration
extraProperties = extraProperties,
adminClientApiEnabled = config.getOrDefault("admin.client.api.enabled",
default.adminClientApiEnabled)
.toString()
.toBoolean() // for what we don't provide a default configuration
)
}

Expand Down
20 changes: 10 additions & 10 deletions producer/src/main/kotlin/streams/kafka/KafkaEventRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,23 @@ import org.apache.kafka.common.errors.OutOfOrderSequenceException
import org.apache.kafka.common.errors.ProducerFencedException
import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.logging.Log
import org.neo4j.logging.internal.LogService
import streams.StreamsEventRouterConfiguration
import streams.asSourceRecordKey
import streams.asSourceRecordValue
import streams.toMap
import streams.StreamsEventRouter
import streams.config.StreamsConfig
import streams.StreamsEventRouterConfiguration
import streams.events.StreamsEvent
import streams.events.StreamsPluginStatus
import streams.events.StreamsTransactionEvent
import streams.extensions.isDefaultDb
import streams.toMap
import streams.utils.JSONUtils
import streams.utils.KafkaValidationUtils.getInvalidTopicsError
import streams.utils.StreamsUtils
import java.util.Properties
import java.util.UUID


class KafkaEventRouter(private val config: Map<String, String>,
private val db: GraphDatabaseService,
private val log: Log): StreamsEventRouter(config, db, log) {
abstract class KafkaEventRouter(private val config: Map<String, String>,
private val db: GraphDatabaseService,
private val log: Log): StreamsEventRouter(config, db, log) {

override val eventRouterConfiguration: StreamsEventRouterConfiguration = StreamsEventRouterConfiguration
.from(config, db.databaseName(), db.isDefaultDb(), log)
Expand All @@ -41,7 +37,7 @@ class KafkaEventRouter(private val config: Map<String, String>,

private var producer: Neo4jKafkaProducer<ByteArray, ByteArray>? = null
private val kafkaConfig by lazy { KafkaConfiguration.from(config, log) }
private val kafkaAdminService by lazy { KafkaAdminService(kafkaConfig, eventRouterConfiguration.allTopics(), log) }
private val kafkaAdminService by lazy { AdminService.getInstance(kafkaConfig, eventRouterConfiguration.allTopics(), log) }

override fun printInvalidTopics() {
val invalidTopics = kafkaAdminService.getInvalidTopics()
Expand All @@ -56,6 +52,10 @@ class KafkaEventRouter(private val config: Map<String, String>,
}

override fun start() = runBlocking {

//val adminServiceFactory = AdminServiceFactory(kafkaConfig, eventRouterConfiguration.allTopics(), log)
//kafkaAdminService = adminServiceFactory.getAdminService(kafkaConfig.adminClientApiEnabled)!!

mutex.withLock(producer) {
if (status(producer) == StreamsPluginStatus.RUNNING) {
return@runBlocking
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class KafkaConfigurationTest {
"kafka.linger.ms" to 10,
"kafka.fetch.min.bytes" to 1234,
"kafka.topic.discovery.polling.interval" to 0L,
"kafka.streams.log.compaction.strategy" to "delete")
"kafka.streams.log.compaction.strategy" to "delete",
"kafka.admin.client.api.enabled" to false)

val kafkaConfig = KafkaConfiguration.create(map.mapValues { it.value.toString() })

Expand All @@ -46,5 +47,6 @@ class KafkaConfigurationTest {
assertEquals(map["kafka.fetch.min.bytes"].toString(), properties["fetch.min.bytes"])
assertEquals(map["kafka.topic.discovery.polling.interval"], properties["topic.discovery.polling.interval"])
assertEquals(map["kafka.streams.log.compaction.strategy"], properties["streams.log.compaction.strategy"])
assertEquals(map["kafka.admin.client.api.enabled"], properties["admin.client.api.enabled"])
}
}

0 comments on commit fe9465c

Please sign in to comment.