Skip to content

Latest commit

 

History

History
284 lines (185 loc) · 9.67 KB

spark-sql-streaming-KafkaOffsetReader.adoc

File metadata and controls

284 lines (185 loc) · 9.67 KB

KafkaOffsetReader

KafkaOffsetReader relies on the ConsumerStrategy to create a Kafka Consumer.

KafkaOffsetReader creates a Kafka Consumer with group.id (ConsumerConfig.GROUP_ID_CONFIG) configuration explicitly set to nextGroupId (i.e. the given driverGroupIdPrefix followed by nextId).

KafkaOffsetReader is created when:

Table 1. KafkaOffsetReader’s Options
Name Description

fetchOffset.numRetries

Default: 3

fetchOffset.retryIntervalMs

How long to wait before retries

Default: 1000

KafkaOffsetReader defines the predefined fixed schema.

Table 2. KafkaOffsetReader’s Internal Registries and Counters
Name Description

_consumer

Kafka’s Consumer (Consumer[Array[Byte], Array[Byte]])

Initialized when KafkaOffsetReader is created.

Used when KafkaOffsetReader:

execContext

groupId

kafkaReaderThread

maxOffsetFetchAttempts

nextId

Initially 0

offsetFetchAttemptIntervalMs

Tip

Enable INFO or DEBUG logging levels for org.apache.spark.sql.kafka010.KafkaOffsetReader to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.kafka010.KafkaOffsetReader=DEBUG

Refer to Logging.

nextGroupId Internal Method

nextGroupId(): String

nextGroupId sets the groupId to be the driverGroupIdPrefix, - followed by the nextId (i.e. [driverGroupIdPrefix]-[nextId]).

In the end, nextGroupId increments the nextId and returns the groupId.

Note
nextGroupId is used exclusively when KafkaOffsetReader is requested for a Kafka Consumer.

resetConsumer Internal Method

resetConsumer(): Unit

resetConsumer…​FIXME

Note
resetConsumer is used when…​FIXME

fetchTopicPartitions Method

fetchTopicPartitions(): Set[TopicPartition]
Caution
FIXME
Note
fetchTopicPartitions is used when KafkaRelation getPartitionOffsets.

Fetching Earliest Offsets — fetchEarliestOffsets Method

fetchEarliestOffsets(): Map[TopicPartition, Long]
fetchEarliestOffsets(newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]
Caution
FIXME
Note
fetchEarliestOffsets is used when KafkaSource rateLimit and generates a DataFrame for a batch (when new partitions have been assigned).

Fetching Latest Offsets — fetchLatestOffsets Method

fetchLatestOffsets(): Map[TopicPartition, Long]
Caution
FIXME
Note
fetchLatestOffsets is used when KafkaSource gets offsets or initialPartitionOffsets is initialized.

withRetriesWithoutInterrupt Internal Method

withRetriesWithoutInterrupt(
  body: => Map[TopicPartition, Long]): Map[TopicPartition, Long]

withRetriesWithoutInterrupt…​FIXME

Note
withRetriesWithoutInterrupt is used when…​FIXME

Creating KafkaOffsetReader Instance

KafkaOffsetReader takes the following when created:

KafkaOffsetReader initializes the internal registries and counters.

Fetching Offsets for Selected TopicPartitions — fetchSpecificOffsets Method

fetchSpecificOffsets(
  partitionOffsets: Map[TopicPartition, Long],
  reportDataLoss: String => Unit): KafkaSourceOffset
KafkaOffsetReader fetchSpecificOffsets
Figure 1. KafkaOffsetReader’s fetchSpecificOffsets

fetchSpecificOffsets requests the Kafka Consumer to poll(0).

fetchSpecificOffsets requests the Kafka Consumer for assigned partitions (using Consumer.assignment()).

fetchSpecificOffsets requests the Kafka Consumer to pause(partitions).

You should see the following DEBUG message in the logs:

DEBUG KafkaOffsetReader: Partitions assigned to consumer: [partitions]. Seeking to [partitionOffsets]

For every partition offset in the input partitionOffsets, fetchSpecificOffsets requests the Kafka Consumer to:

  • seekToEnd for the latest (aka -1)

  • seekToBeginning for the earliest (aka -2)

  • seek for other offsets

In the end, fetchSpecificOffsets creates a collection of Kafka’s TopicPartition and position (using the Kafka Consumer).

Note
fetchSpecificOffsets is used when KafkaSource fetches and verifies initial partition offsets.

Creating Kafka Consumer — createConsumer Internal Method

createConsumer(): Consumer[Array[Byte], Array[Byte]]
Note
createConsumer is used when KafkaOffsetReader is created (and initializes consumer) and resetConsumer

Creating Kafka Consumer (Unless Already Available) — consumer Method

consumer: Consumer[Array[Byte], Array[Byte]]

consumer gives the cached Kafka Consumer or creates one itself.

Note
Since consumer method is used (to access the internal Kafka Consumer) in the fetch methods that gives the property of creating a new Kafka Consumer whenever the internal Kafka Consumer reference become null, i.e. as in resetConsumer.

consumer…​FIXME

Note
consumer is used when KafkaOffsetReader is requested to fetchTopicPartitions, fetchSpecificOffsets, fetchEarliestOffsets, and fetchLatestOffsets.

Closing — close Method

close(): Unit

close stop the Kafka Consumer (if the Kafka Consumer is available).

close requests the ExecutorService to shut down.

Note

close is used when:

runUninterruptibly Internal Method

runUninterruptibly[T](body: => T): T

runUninterruptibly…​FIXME

Note
runUninterruptibly is used when…​FIXME

stopConsumer Internal Method

stopConsumer(): Unit

stopConsumer…​FIXME

Note
stopConsumer is used when…​FIXME

toString Method

toString(): String
Note
toString is part of the java.lang.Object Contract for a string representation of the object.

toString…​FIXME