Skip to content

Commit

Permalink
Merge pull request #1 from Morningstar/master
Browse files Browse the repository at this point in the history
Merge
  • Loading branch information
VictorZeng authored Mar 26, 2018
2 parents 01126e4 + 4accd36 commit 4721c8c
Show file tree
Hide file tree
Showing 24 changed files with 1,245 additions and 222 deletions.
8 changes: 2 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ This is a small web app, you can run it locally or on a server, as long as you h
```
java -Djava.security.auth.login.config=conf/server-client-jaas.conf \
-cp KafkaOffsetMonitor-assembly-0.4.0.jar \
-cp KafkaOffsetMonitor-assembly-0.4.6.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka \
--kafkaBrokers kafkabroker01:6667,kafkabroker02:6667 \
Expand Down Expand Up @@ -126,7 +126,7 @@ As long as this is true you will need to use local maven repo and just publish K
Assuming you have a custom implementation of OffsetInfoReporter in a jar file, running it is as simple as adding the jar to the classpath when running app:

```
java -cp KafkaOffsetMonitor-assembly-0.3.0.jar:kafka-offset-monitor-another-db-reporter.jar \
java -cp KafkaOffsetMonitor-assembly-0.4.6.jar:kafka-offset-monitor-another-db-reporter.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk zkserver01,zkserver02 \
--port 8080 \
Expand All @@ -141,7 +141,3 @@ Contributing
============

The KafkaOffsetMonitor is released under the Apache License and we **welcome any contributions** within this license. Any pull request is welcome and will be reviewed and merged as quickly as possible.

Because this open source tool is released by [Quantifind](http://www.quantifind.com) as a company, if you want to submit a pull request, you will have to sign the following simple contributors agreement:
- If you are an individual, please sign [this contributors agreement](https://docs.google.com/a/quantifind.com/document/d/1RS7qEjq3cCmJ1665UhoCMK8541Ms7KyU3kVFoO4CR_I/) and send it back to [email protected]
- If you are contributing changes that you did as part of your work, please sign [this contributors agreement](https://docs.google.com/a/quantifind.com/document/d/1kNwLT4qG3G0Ct2mEuNdBGmKDYuApN1CpQtZF8TSVTjE/) and send it back to [email protected]
32 changes: 18 additions & 14 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
name := "KafkaOffsetMonitor"
version := "0.4.1-SNAPSHOT"
scalaVersion := "2.11.8"
version := "0.4.6-SNAPSHOT"
scalaVersion := "2.11.11"
organization := "com.quantifind"

scalacOptions ++= Seq("-deprecation", "-unchecked", "-optimize", "-feature")

mainClass in Compile := Some("com.quantifind.kafka.offsetapp.OffsetGetterWeb")

libraryDependencies ++= Seq(
"log4j" % "log4j" % "1.2.17",
"net.databinder" %% "unfiltered-filter" % "0.8.4",
"net.databinder" %% "unfiltered-jetty" % "0.8.4",
"net.databinder" %% "unfiltered-json4s" % "0.8.4",
"com.quantifind" %% "sumac" % "0.3.0",
"org.apache.kafka" %% "kafka" % "0.9.0.1",
"org.reflections" % "reflections" % "0.9.10",
"com.twitter" %% "util-core" % "6.40.0",
"com.typesafe.slick" %% "slick" % "2.1.0",
"org.xerial" % "sqlite-jdbc" % "3.7.2",
"org.mockito" % "mockito-all" % "1.10.19" % "test",
"org.scalatest" %% "scalatest" % "2.2.6" % "test")
"log4j" % "log4j" % "1.2.17",
"net.databinder" %% "unfiltered-filter" % "0.8.4",
"net.databinder" %% "unfiltered-jetty" % "0.8.4",
"net.databinder" %% "unfiltered-json4s" % "0.8.4",
"com.quantifind" %% "sumac" % "0.3.0",
"org.apache.kafka" %% "kafka" % "0.9.0.1",
"org.reflections" % "reflections" % "0.9.11",
"com.twitter" %% "util-core" % "7.1.0",
"com.typesafe.slick" %% "slick" % "2.1.0",
"org.xerial" % "sqlite-jdbc" % "3.18.0",
"com.google.code.gson" % "gson" % "2.8.2",
"com.google.guava" % "guava" % "20.0",
"javax.ws.rs" % "javax.ws.rs-api" % "2.0-m16",
"org.glassfish.jersey.core" % "jersey-client" % "2.25.1",
"org.mockito" % "mockito-all" % "1.10.19" % "test",
"org.scalatest" %% "scalatest" % "2.2.6" % "test")

assemblyMergeStrategy in assembly := {
case "about.html" => MergeStrategy.discard
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
project.organization=com.quantifind
project.name=KafkaOffsetMonitor
project.version=0.4
sbt.version=0.13.13
sbt.version=0.13.16
50 changes: 50 additions & 0 deletions src/main/java/com/morningstar/kafka/KafkaCommittedOffset.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.morningstar.kafka;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;

public class KafkaCommittedOffset {

private String groupName;
private boolean groupIsActive;
private String topicName;
private int partitionId;
private KafkaCommittedOffsetMetadata committedOffset;


public KafkaCommittedOffset(String groupName, boolean groupIsActive, String topicName, int partitionId, long committedOffset, long committedMillis) {

Preconditions.checkArgument(!Strings.isNullOrEmpty(groupName), "groupName must not be NULL or empty.");
Preconditions.checkArgument(!Strings.isNullOrEmpty(topicName), "topicName must not be NULL or empty.");
Preconditions.checkArgument(partitionId > -1, "partitionId must be greater than or equal to 0.");
Preconditions.checkArgument(committedOffset > -1, "committedOffset must be greater than or equal to 0.");
Preconditions.checkArgument(committedMillis > -1, "committedMillis must be greater than or equal to 0.");

this.groupName = groupName;
this.groupIsActive = groupIsActive;
this.topicName = topicName;
this.partitionId = partitionId;
this.committedOffset = new KafkaCommittedOffsetMetadata(committedOffset, committedMillis);
}


public String getGroupName() {
return groupName;
}

public boolean getGroupIsActive() {
return groupIsActive;
}

public String getTopicName() {
return topicName;
}

public int getPartitionId() {
return partitionId;
}

public KafkaCommittedOffsetMetadata getCommittedOffset() {
return committedOffset;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.morningstar.kafka;

import com.google.common.base.Preconditions;
import com.google.gson.annotations.Expose;

public class KafkaCommittedOffsetMetadata extends KafkaOffsetMetadata {

@Expose private long lag = -1;


public KafkaCommittedOffsetMetadata(KafkaOffsetMetadata offsetMetadata, long lag) {
super(offsetMetadata.getOffset(), offsetMetadata.getTimestamp());
verifyParameters(lag);
this.lag = lag;
}

public KafkaCommittedOffsetMetadata(long committedOffset, long timestamp, long lag) {
super(committedOffset, timestamp);
verifyParameters(lag);
this.lag = lag;
}

public KafkaCommittedOffsetMetadata(KafkaOffsetMetadata offsetMetadata) {
super(offsetMetadata.getOffset(), offsetMetadata.getTimestamp());
}

public KafkaCommittedOffsetMetadata(long committedOffset, long timestamp) {
super(committedOffset, timestamp);
}

private void verifyParameters(long lag) {

Preconditions.checkArgument(lag > -2, "lag must not be less than -1.");
}


public long getLag() {
return lag;
}

public void setLag(long lag) {
this.lag = lag;
}
}
168 changes: 168 additions & 0 deletions src/main/java/com/morningstar/kafka/KafkaConsumerGroup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package com.morningstar.kafka;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import com.google.gson.annotations.Expose;

import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;


public class KafkaConsumerGroup {

private final long COMPLETE_THRESHOLD = 10;

@Expose private String consumerGroupName;
@Expose private boolean isActive;
@Expose private boolean complete;
@Expose private long mostRecentCommittedMillis;
@Expose private Status status;
@Expose private Set<KafkaTopicPartition> topicPartitions;


public KafkaConsumerGroup(String consumerGroupName) {

Preconditions.checkArgument(!Strings.isNullOrEmpty(consumerGroupName), "consumerGroupName cannot be NULL or empty.");

this.consumerGroupName = consumerGroupName;
this.isActive = false;
this.complete = false;
this.mostRecentCommittedMillis = -1;
this.status = Status.OK;
this.topicPartitions = Sets.newConcurrentHashSet();
}


public String getConsumerGroupName() {
return consumerGroupName;
}

public boolean getComplete() { return complete; }

public long getMaxCommitedMillis() {
return mostRecentCommittedMillis;
}

public boolean isActive() {
return isActive;
}

public Set<KafkaTopicPartition> getTopicPartitions() {

return topicPartitions;
}

public Set<String> getTopics() {

return topicPartitions.stream()
.map(KafkaTopicPartition::getTopicName)
.collect(Collectors.toSet());
}

public synchronized void updateStatus() {

if (!isActive) {
this.status = Status.ERR;
return;
}

Status newStatus = Status.OK;

for (KafkaTopicPartition topicPartition : topicPartitions) {

// Set group status to ERROR if any topicPartition's status is STOP
if (Status.STOP == topicPartition.getStatus()) {
newStatus = Status.ERR;
break;
}

// Set group status to ERROR if any topicPartition's status is REWIND
if (Status.REWIND == topicPartition.getStatus()) {
newStatus = Status.ERR;
break;
}

// Set group status to ERROR if any topicPartition's status is STALL
if (Status.STALL == topicPartition.getStatus()) {
newStatus = Status.ERR;
break;
}

// Set group status to WARN if any topicPartition's status is WARN
if (Status.WARN == topicPartition.getStatus()) {
newStatus = Status.WARN;
break;
}
}

this.status = newStatus;
}


private Optional<KafkaTopicPartition> getTopicPartition(String topic, int partitionId) {

//return committedOffsets.keySet().stream().filter(tp -> (tp.getTopicName().equals(topic) && tp.getPartitionId() == partitionId)).findFirst();
return topicPartitions.stream()
.filter(tp -> (tp.getTopicName().equals(topic) && tp.getPartitionId() == partitionId))
.findFirst();
}

private void upsertTopicPartition(KafkaCommittedOffset kafkaCommittedOffset) {

Preconditions.checkArgument(!Strings.isNullOrEmpty(kafkaCommittedOffset.getTopicName()), "topic cannot be NULL or empty.");
Preconditions.checkArgument(kafkaCommittedOffset.getPartitionId() >= 0, "partitionId must be greater-than or equal-to zero.");

String incomingTopicName = kafkaCommittedOffset.getTopicName();
int incomingPartitionId = kafkaCommittedOffset.getPartitionId();

Optional<KafkaTopicPartition> existingTopicPartition = getTopicPartition(incomingTopicName, incomingPartitionId);

if (existingTopicPartition.isPresent()) {
// Append committed offset info to existing set item
existingTopicPartition.get().addCommittedOffset(kafkaCommittedOffset.getCommittedOffset());
} else {
// Add a new entry to the map
KafkaTopicPartition newTopicPartition = new KafkaTopicPartition(incomingTopicName, incomingPartitionId);
newTopicPartition.addCommittedOffset(kafkaCommittedOffset.getCommittedOffset());
topicPartitions.add(newTopicPartition);
}
}

private void setMostRecentCommittedMillis(long mostRecentCommittedMillis) {
if (this.mostRecentCommittedMillis < mostRecentCommittedMillis) {
this.mostRecentCommittedMillis = mostRecentCommittedMillis;
}
}

private void updateCompleteFlag() {

this.complete = topicPartitions.stream()
.noneMatch(f -> f.getCommittedOffsets().size() < COMPLETE_THRESHOLD);
}

public void addCommittedOffsetInfo(KafkaCommittedOffset kafkaCommittedOffset) {

setMostRecentCommittedMillis(kafkaCommittedOffset.getCommittedOffset().getTimestamp());
this.isActive = kafkaCommittedOffset.getGroupIsActive();
upsertTopicPartition(kafkaCommittedOffset);
updateCompleteFlag();
}


@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

KafkaConsumerGroup that = (KafkaConsumerGroup) o;

return getConsumerGroupName().equals(that.getConsumerGroupName());
}

@Override
public int hashCode() {
return getConsumerGroupName().hashCode();
}
}
Loading

0 comments on commit 4721c8c

Please sign in to comment.