Skip to content

Commit

Permalink
[SPARK-43233][SS] Add logging for Kafka Batch Reading for topic parti…
Browse files Browse the repository at this point in the history
…tion, offset range and task ID

### What changes were proposed in this pull request?
We add a logging when creating the batch reader with task ID, topic, partition and offset range included.
The log line looks like following:

23/04/18 22:35:38 INFO KafkaBatchReaderFactory: Creating Kafka reader partitionId=1 partition=StreamingDustTest-KafkaToKafkaTopic-4ccf8662-c3ca-4f3b-871e-1853c0e61765-source-2 fromOffset=0 untilOffset=3 queryId=b5b806c3-ebf3-432e-a9a7-d882d474c0f5 batchId=0 taskId=1

### Why are the changes needed?
Right now, for structure streaming from Kafka, it's hard to finding which task handling which topic/partition and offset range.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Run KafkaMicroBatchV2SourceSuite and watch logging outputs contain information needed. Also does a small cluster test and observe logs.

Closes apache#40905 from siying/kafka_logging.

Lead-authored-by: Siying Dong <[email protected]>
Co-authored-by: Ubuntu <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
2 people authored and HeartSaVioR committed Apr 25, 2023
1 parent 35f0d8e commit 82138f4
Showing 1 changed file with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package org.apache.spark.sql.kafka010

import java.{util => ju}

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.connector.metric.CustomTaskMetric
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.execution.streaming.{MicroBatchExecution, StreamExecution}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer

/** A [[InputPartition]] for reading Kafka data in a batch based streaming query. */
Expand All @@ -34,9 +36,18 @@ private[kafka010] case class KafkaBatchInputPartition(
failOnDataLoss: Boolean,
includeHeaders: Boolean) extends InputPartition

private[kafka010] object KafkaBatchReaderFactory extends PartitionReaderFactory {
private[kafka010] object KafkaBatchReaderFactory extends PartitionReaderFactory with Logging {
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
val p = partition.asInstanceOf[KafkaBatchInputPartition]

val taskCtx = TaskContext.get()
val queryId = taskCtx.getLocalProperty(StreamExecution.QUERY_ID_KEY)
val batchId = taskCtx.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)
logInfo(s"Creating Kafka reader topicPartition=${p.offsetRange.topicPartition} " +
s"fromOffset=${p.offsetRange.fromOffset} untilOffset=${p.offsetRange.untilOffset}, " +
s"for query queryId=$queryId batchId=$batchId taskId=${TaskContext.get().taskAttemptId()} " +
s"partitionId=${TaskContext.get().partitionId()}")

KafkaBatchPartitionReader(p.offsetRange, p.executorKafkaParams, p.pollTimeoutMs,
p.failOnDataLoss, p.includeHeaders)
}
Expand Down

0 comments on commit 82138f4

Please sign in to comment.