Skip to content

Commit

Permalink
Counts offset gaps
Browse files Browse the repository at this point in the history
  • Loading branch information
solsson committed Feb 26, 2019
1 parent d330aa5 commit bb89da5
Showing 1 changed file with 7 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class KeyvalueUpdateProcessor implements KeyvalueUpdate, Processor<String
.name("onupdate_completed").help("Total on-update requests completed").register();
static final Counter onUpdateCompletedOutOfOrder = Counter.build()
.name("onupdate_completed_outoforder").help("On-update requests completed out of order with previous").register();
static final Counter offsetsNotProcessed = Counter.build()
.name("offsets_not_processed").help("The processor won't see null key messages, one reason to count gaps in the offset sequence").register();

private static final String SOURCE_NAME = "Source";
private static final String PROCESSOR_NAME = "KeyvalueUpdate";
Expand Down Expand Up @@ -203,15 +205,17 @@ private static class OnUpdateCompletionLogging implements OnUpdate.Completion {
if (previous == null) {
logger.info("This is the first on-update for topic {} partition {} at offset {}", record.getTopic(), record.getPartition(), record.getOffset());
} else {
logger.debug("Got onupdate completeion for topic {} partition {} offset {} previous offset {}", record.getTopic(), record.getPartition(), record.getOffset(), previous.record.getOffset());
logger.debug("Got onupdate completion for topic {} partition {} offset {} previous offset {}", record.getTopic(), record.getPartition(), record.getOffset(), previous.record.getOffset());
// sanity checks here and the whole previous tracking can probably be removed once we have decent e2e coverage
this.previous = previous;
UpdateRecord p = previous.record;
if (!record.getTopic().equals(p.getTopic())) throw new IllegalArgumentException("Mismatch with previous, topics: " + record.getTopic() + " != " + p.getTopic());
if (record.getPartition() != p.getPartition()) throw new IllegalArgumentException("Mismatch with previous, topic " + record.getTopic() + " partitions: " + record.getPartition() + "!=" + p.getPartition());
if (record.getOffset() == p.getOffset()) throw new IllegalArgumentException("Duplicate completion logging for topic " + record.getTopic() + " partition " + record.getPartition() + " offset " + p.getOffset());
long offsetoffset = record.getOffset() - p.getOffset();
if (offsetoffset == 0) throw new IllegalArgumentException("Duplicate completion logging for topic " + record.getTopic() + " partition " + record.getPartition() + " offset " + p.getOffset());
if (offsetoffset < 0) throw new IllegalArgumentException("Completion tracking created in reverse offset order, topic " + record.getTopic() + " partition " + record.getPartition() + ": from " + p.getOffset() + " to " + record.getOffset());
// null keys will be ignored so there might be gaps, but we should be able to create these logging instances in offset order
if (record.getOffset() < p.getOffset()) throw new IllegalArgumentException("Completion tracking created in reverse offset order, topic " + record.getTopic() + " partition " + record.getPartition() + ": from " + p.getOffset() + " to " + record.getOffset());
if (offsetoffset > 1) offsetsNotProcessed.inc(offsetoffset);
}
onUpdatePending.inc();
}
Expand Down

0 comments on commit bb89da5

Please sign in to comment.