diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala index b6d64c79b1df4..508f5c7036bd1 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala @@ -19,11 +19,13 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} +import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.connector.metric.CustomTaskMetric import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} +import org.apache.spark.sql.execution.streaming.{MicroBatchExecution, StreamExecution} import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer /** A [[InputPartition]] for reading Kafka data in a batch based streaming query. */ @@ -34,9 +36,18 @@ private[kafka010] case class KafkaBatchInputPartition( failOnDataLoss: Boolean, includeHeaders: Boolean) extends InputPartition -private[kafka010] object KafkaBatchReaderFactory extends PartitionReaderFactory { +private[kafka010] object KafkaBatchReaderFactory extends PartitionReaderFactory with Logging { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { val p = partition.asInstanceOf[KafkaBatchInputPartition] + + val taskCtx = TaskContext.get() + val queryId = taskCtx.getLocalProperty(StreamExecution.QUERY_ID_KEY) + val batchId = taskCtx.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY) + logInfo(s"Creating Kafka reader topicPartition=${p.offsetRange.topicPartition} " + + s"fromOffset=${p.offsetRange.fromOffset} untilOffset=${p.offsetRange.untilOffset}, " + + s"for query queryId=$queryId batchId=$batchId taskId=${TaskContext.get().taskAttemptId()} " + + s"partitionId=${TaskContext.get().partitionId()}") + KafkaBatchPartitionReader(p.offsetRange, p.executorKafkaParams, p.pollTimeoutMs, p.failOnDataLoss, p.includeHeaders) }