Skip to content

Commit

Permalink
HOTFIX - LIKAFKA-21968: Add broker-side observer interface and NoOpOb…
Browse files Browse the repository at this point in the history
…server implementation (#6)

Reviewers: Radai Rosenblatt
  • Loading branch information
Lincong Li committed Mar 26, 2019
1 parent a99b4f8 commit fb2a630
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 1 deletion.
7 changes: 7 additions & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val metadataCache: MetadataCache,
val metrics: Metrics,
val authorizer: Option[Authorizer],
val observer: Observer,
val quotas: QuotaManagers,
val fetchManager: FetchManager,
brokerTopicStats: BrokerTopicStats,
Expand Down Expand Up @@ -2374,6 +2375,12 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseString =
if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
else None
try {
observer.observe(request, response)
} catch {
case e: Exception => error(s"Observer failed to observe ${Observer.describeRequestAndResponse(request, response)}", e)
}

new RequestChannel.SendResponse(request, responseSend, responseString, onComplete)
case None =>
new RequestChannel.NoOpResponse(request)
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ object Defaults {
/************* Authorizer Configuration ***********/
val AuthorizerClassName = ""

/** ********* Broker-side configuration ***********/
val ObserverClassName = "kafka.server.NoOpObserver"
val ObserverShutdownTimeoutMs = 2000

/** ********* Socket Server Configuration ***********/
val Port = 9092
val HostName: String = new String("")
Expand Down Expand Up @@ -278,6 +282,11 @@ object KafkaConfig {
val ProducerBatchDecompressionEnableProp = "producer.batch.decompression.enable"
/************* Authorizer Configuration ***********/
val AuthorizerClassNameProp = "authorizer.class.name"

/** ********* Broker-side observer Configuration ****************/
val ObserverClassNameProp = "observer.class.name"
val ObserverShutdownTimeoutMsProp = "observer.shutdown.timeout"

/** ********* Socket Server Configuration ***********/
val PortProp = "port"
val HostNameProp = "host.name"
Expand Down Expand Up @@ -797,6 +806,12 @@ object KafkaConfig {
val PasswordEncoderKeyLengthDoc = "The key length used for encoding dynamically configured passwords."
val PasswordEncoderIterationsDoc = "The iteration count used for encoding dynamically configured passwords."

/** ********* Broker-side Observer Configuration *********/
val ObserverClassNameDoc = "The name of the observer class that is used to observe requests and/or response on broker."
val ObserverShutdownTimeoutMsDoc = "The maximum time of closing/shutting down an observer. This property can not be less than or equal to " +
"zero. When closing/shutting down an observer, most time is spent on flushing the observed stats. The reasonable timeout should be close to " +
"the time it takes to flush the stats."

private val configDef = {
import ConfigDef.Importance._
import ConfigDef.Range._
Expand Down Expand Up @@ -832,6 +847,10 @@ object KafkaConfig {
/************* Authorizer Configuration ***********/
.define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc)

/************* Broker-side Observer Configuration ***********/
.define(ObserverClassNameProp, STRING, Defaults.ObserverClassName, MEDIUM, ObserverClassNameDoc)
.define(ObserverShutdownTimeoutMsProp, LONG, Defaults.ObserverShutdownTimeoutMs, atLeast(1), MEDIUM, ObserverShutdownTimeoutMsDoc)

/** ********* Socket Server Configuration ***********/
.define(PortProp, INT, Defaults.Port, HIGH, PortDoc)
.define(HostNameProp, STRING, Defaults.HostName, HIGH, HostNameDoc)
Expand Down Expand Up @@ -1121,6 +1140,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
/************* Authorizer Configuration ***********/
val authorizerClassName: String = getString(KafkaConfig.AuthorizerClassNameProp)

/************* Broker-side Observer Configuration ********/
val ObserverClassName: String = getString(KafkaConfig.ObserverClassNameProp)
val ObserverShutdownTimeoutMs: Long = getLong(KafkaConfig.ObserverShutdownTimeoutMsProp)

/** ********* Socket Server Configuration ***********/
val hostName = getString(KafkaConfig.HostNameProp)
val port = getInt(KafkaConfig.PortProp)
Expand Down
15 changes: 14 additions & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP

var apis: KafkaApis = null
var authorizer: Option[Authorizer] = None
var observer: Observer = null
var socketServer: SocketServer = null
var requestHandlerPool: KafkaRequestHandlerPool = null

Expand Down Expand Up @@ -304,13 +305,22 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
authZ
}

observer = try {
CoreUtils.createObject[Observer](config.ObserverClassName)
} catch {
case e: Exception =>
error(s"Creating observer instance from the given class name ${config.ObserverClassName} failed.", e)
new NoOpObserver
}
observer.configure(config.originals())

val fetchManager = new FetchManager(Time.SYSTEM,
new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))

/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, observer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager)

requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
Expand Down Expand Up @@ -606,6 +616,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
if (apis != null)
CoreUtils.swallow(apis.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)

CoreUtils.swallow(observer.close(config.ObserverShutdownTimeoutMs, TimeUnit.MILLISECONDS), this)

if (adminManager != null)
CoreUtils.swallow(adminManager.shutdown(), this)

Expand Down
42 changes: 42 additions & 0 deletions core/src/main/scala/kafka/server/NoOpObserver.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server

import java.util.Map
import java.util.concurrent.TimeUnit
import kafka.network.RequestChannel
import org.apache.kafka.common.requests.AbstractResponse

/**
* An observer implementation that has no operation and serves as a place holder.
*/
class NoOpObserver extends Observer {

def configure(configs: Map[String, _]): Unit = {}

/**
* Observer the record based on the given information.
*/
def observe(request: RequestChannel.Request, response: AbstractResponse): Unit = {}

/**
* Close the observer with timeout.
*/
def close(timeout: Long, unit: TimeUnit): Unit = {}

}
90 changes: 90 additions & 0 deletions core/src/main/scala/kafka/server/Observer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server

import java.util.concurrent.TimeUnit
import kafka.network.RequestChannel
import org.apache.kafka.common.requests.AbstractResponse
import org.apache.kafka.common.Configurable

/**
* Top level interface that all pluggable observer must implement. Kafka will read the 'observer.class.name' config
* value at startup time, create an instance of the specificed class using the default constructor, and call its
* 'configure' method.
*
* From that point onwards, every pair of request and response will be routed to the 'record' method.
*
* If 'observer.class.name' has no value specified or the specified class does not exist, the <code>NoOpObserver</code>
* will be used as a place holder.
*/
trait Observer extends Configurable {

/**
* Observe the record based on the given information.
*
* @param request the request being observed for a various purpose(s)
* @param response the response to the request
*/
def observe(request: RequestChannel.Request, response: AbstractResponse): Unit

/**
* Close the observer with timeout.
*
* @param timeout the maximum time to wait to close the observer.
* @param unit the time unit.
*/
def close(timeout: Long, unit: TimeUnit): Unit
}

object Observer {

/**
* Generates a description of the given request and response. It could be used mostly for debugging purpose.
*
* @param request the request being described
* @param response the response to the request
*/
def describeRequestAndResponse(request: RequestChannel.Request, response: AbstractResponse): String = {
var requestDesc = "Request"
var responseDesc = "Response"
try {
if (request == null) {
requestDesc += " null"
} else {
requestDesc += (" header: " + request.header)
requestDesc += (" from service with principal: " +
request.session.sanitizedUser +
" IP address: " + request.session.clientAddress)
}
requestDesc += " | " // Separate the response description from the request description

if (response == null) {
responseDesc += " null"
} else {
responseDesc += (if (response.errorCounts == null || response.errorCounts.size == 0) {
" with no error"
} else {
" with errors: " + response.errorCounts
})
}
} catch {
case e: Exception => return e.toString // If describing fails, return the exception message directly
}
requestDesc + responseDesc
}
}
2 changes: 2 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class KafkaApisTest {
private val brokerId = 1
private val metadataCache = new MetadataCache(brokerId)
private val authorizer: Option[Authorizer] = None
private val observer = EasyMock.createNiceMock(classOf[Observer])
private val clientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
private val clientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
private val replicaQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
Expand Down Expand Up @@ -100,6 +101,7 @@ class KafkaApisTest {
metadataCache,
metrics,
authorizer,
observer,
quotas,
fetchManager,
brokerTopicStats,
Expand Down
4 changes: 4 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,10 @@ class KafkaConfigTest {
case KafkaConfig.DelegationTokenExpiryTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")

// Broker-side observer configs
case KafkaConfig.ObserverClassNameProp => // ignore since even if the class name is invalid, a NoOpObserver class is used instead
case KafkaConfig.ObserverShutdownTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1", "0")

case _ => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
}
})
Expand Down

0 comments on commit fb2a630

Please sign in to comment.