Skip to content

Commit

Permalink
Rename caller protected methods to use fetch prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
sushantmane committed Oct 6, 2023
1 parent 6223bc1 commit 6d3d2d1
Showing 1 changed file with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,19 +158,22 @@ private long getEarliestOrLatestOffset(
public long getPartitionOffsetByTime(PubSubTopicPartition pubSubTopicPartition, long timestamp) {
isTopicPartitionValid(pubSubTopicPartition);
try (AutoCloseableLock ignore = AutoCloseableLock.of(pubSubConsumerLock)) {
Long result = offsetsForTimesUnprotected(pubSubTopicPartition, timestamp);
Long result = fetchOffsetsForTimes(pubSubTopicPartition, timestamp);
if (result == null) {
result = getOffsetByTimeIfOutOfRangeUnprotected(pubSubTopicPartition, timestamp);
result = fetchOffsetByTimeIfOutOfRange(pubSubTopicPartition, timestamp);
} else if (result == -1L) {
// The given timestamp exceed the timestamp of the last message. So return the last offset.
logger.warn("Offsets result is empty. Will complement with the last offsets.");
result = endOffsetsWithRetryUnprotected(pubSubTopicPartition) + 1;
result = fetchEndOffsetsWithRetry(pubSubTopicPartition) + 1;
}
return result;
}
}

private Long offsetsForTimesUnprotected(PubSubTopicPartition pubSubTopicPartition, long timestamp) {
/**
* This method needs to be called with the pubSubConsumerLock held.
*/
private Long fetchOffsetsForTimes(PubSubTopicPartition pubSubTopicPartition, long timestamp) {
return RetryUtils.executeWithMaxAttemptAndExponentialBackoff(
() -> pubSubConsumerAdapter.offsetForTime(pubSubTopicPartition, timestamp, kafkaOperationTimeout),
25,
Expand All @@ -180,7 +183,10 @@ private Long offsetsForTimesUnprotected(PubSubTopicPartition pubSubTopicPartitio
PUBSUB_RETRIABLE_FAILURES);
}

private Long endOffsetsWithRetryUnprotected(PubSubTopicPartition partition) {
/**
* This method needs to be called with the pubSubConsumerLock held.
*/
private Long fetchEndOffsetsWithRetry(PubSubTopicPartition partition) {
return RetryUtils.executeWithMaxAttemptAndExponentialBackoff(
() -> pubSubConsumerAdapter.endOffset(partition),
25,
Expand Down Expand Up @@ -410,14 +416,14 @@ private void isTopicPartitionValid(PubSubTopicPartition pubSubTopicPartition) {
public long getOffsetByTimeIfOutOfRange(PubSubTopicPartition pubSubTopicPartition, long timestamp) {
isTopicPartitionValid(pubSubTopicPartition);
try (AutoCloseableLock ignore = AutoCloseableLock.of(pubSubConsumerLock)) {
return getOffsetByTimeIfOutOfRangeUnprotected(pubSubTopicPartition, timestamp);
return fetchOffsetByTimeIfOutOfRange(pubSubTopicPartition, timestamp);
}
}

/**
* Call this function with the consumer lock held.
*/
private long getOffsetByTimeIfOutOfRangeUnprotected(PubSubTopicPartition pubSubTopicPartition, long timestamp)
private long fetchOffsetByTimeIfOutOfRange(PubSubTopicPartition pubSubTopicPartition, long timestamp)
throws PubSubTopicDoesNotExistException {
long latestOffset = getLatestOffset(pubSubTopicPartition);
if (latestOffset <= 0) {
Expand Down

0 comments on commit 6d3d2d1

Please sign in to comment.