Skip to content

Commit

Permalink
Adding xinfra identifier code to kafka api handler (#506)
Browse files Browse the repository at this point in the history
* Update Observer.scala

* another test commit

* reverting test commits and updating observer

* fixing bulk comment

* fixing bulk comments

* adding observer code in kafka apis

* fixing build issues

* committing change

* adding client id to observer

* fixing NoOpObserver

* moving logic outside unofficial clietn logging enabled check

* adding a null check and test

* adding testing line

* adding a test

* retriggering build

* changing software name lookup logic

* cleaning up logic

* fixing variable name

* addressing comments

* fix build issue

* adding more comments
  • Loading branch information
JobseRyan authored Mar 18, 2024
1 parent 962fabd commit a339775
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 1 deletion.
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1941,10 +1941,13 @@ class KafkaApis(val requestChannel: RequestChannel,
// with client authentication which is performed at an earlier stage of the connection where the
// ApiVersionRequest is not available.
val apiVersionRequest = request.body[ApiVersionsRequest]
val softwareName = apiVersionRequest.data.clientSoftwareName().split("-").last

val isXinfraClient = (softwareName != null && softwareName.toLowerCase.contains("xinfra"))
observer.trackClientLibrary(isXinfraClient, request.context.clientId())

if (config.unofficialClientLoggingEnable) {
// Check if the last part of clientSoftwareName (after commitId) is an unexpected software name
val softwareName = apiVersionRequest.data.clientSoftwareName().split("-").last
if (!config.expectedClientSoftwareNames.contains(softwareName)) {
val clientIdentity = request.context.clientId() + " " + request.context.clientAddress() + " " + request.context.principal()
unofficialClientsCache.get(clientIdentity)
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/server/NoOpObserver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,9 @@ class NoOpObserver extends Observer {
*/
def close(timeout: Long, unit: TimeUnit): Unit = {}

/**
* track the client library.
*/
def trackClientLibrary(isXinfraClient: Boolean, clientId: String): Unit = {}

}
9 changes: 9 additions & 0 deletions core/src/main/scala/kafka/server/Observer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ trait Observer extends Configurable {
*/
def observeProduceRequest(requestContext: RequestContext, produceRequest: ProduceRequest): Unit

/**
* Hook to track the client library type so different client types can be compared. This function is in the hot path
* of request-handling so all computations that are added to it need to be light
*
* @param isXinfraClient is this clientId a xinfra client
* @param clientId The clientId for this specific request
*/
def trackClientLibrary(isXinfraClient: Boolean, clientId: String): Unit

/**
* Close the observer with timeout.
*
Expand Down
31 changes: 31 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
import org.apache.kafka.common.requests.ApiVersionsRequest
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry
Expand Down Expand Up @@ -497,6 +498,36 @@ class KafkaApisTest {
)
}

@Test
def testClientLibraryVersionObserverCaching(): Unit = {
val requestBuilder = new ApiVersionsRequest.Builder()
EasyMock.expect(observer.trackClientLibrary(true, clientId))
val kafkaApis = createKafkaApis(enableForwarding = true)

val topicHeader = new RequestHeader(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion,
clientId, 0)

val request = buildRequest(requestBuilder.build(topicHeader.apiVersion))

if (kafkaApis.metadataSupport.isInstanceOf[ZkSupport]) {
// The controller check only makes sense for ZK clusters. For KRaft,
// controller requests are handled on a separate listener, so there
// is no choice but to forward them..
EasyMock.expect(controller.isActive).andReturn(false)
}

expectNoThrottling(request)

EasyMock.expect(forwardingManager.forwardRequest(
EasyMock.eq(request),
anyObject[Option[AbstractResponse] => Unit]()
)).once()

EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, controller, forwardingManager)

kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
}

private def testForwardableApi(apiKey: ApiKeys, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Unit = {
testForwardableApi(
createKafkaApis(enableForwarding = true),
Expand Down

0 comments on commit a339775

Please sign in to comment.