Skip to content

Commit

Permalink
Merge pull request #36 from Nasdaq/0.4.0
Browse files Browse the repository at this point in the history
Reusing Consumer Group
  • Loading branch information
ruchirvaninasdaq authored Jul 19, 2021
2 parents 9274c03 + 51f9559 commit 02e7182
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 24 deletions.
2 changes: 1 addition & 1 deletion ncds-sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.nasdaq.ncds</groupId>
<artifactId>ncds</artifactId>
<version>0.3.0</version>
<version>0.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

Expand Down Expand Up @@ -82,8 +83,12 @@ public KafkaConsumer getKafkaConsumer(String streamName) throws Exception {
if (kafkaSchema == null) {
throw new Exception("Kafka Schema not Found for Stream: " + streamName);
}
kafkaConsumer = getConsumer(kafkaSchema);
kafkaConsumer.subscribe(Collections.singletonList(streamName + ".stream"));
kafkaConsumer = getConsumer(kafkaSchema, streamName);
TopicPartition topicPartition = new TopicPartition(streamName + ".stream",0);
kafkaConsumer.assign(Collections.singletonList(topicPartition));
if(kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).equals(OffsetResetStrategy.EARLIEST.toString().toLowerCase())) {
return seekToMidNight(topicPartition);
}
}
catch (Exception e) {
throw (e);
Expand All @@ -106,7 +111,7 @@ public KafkaConsumer getKafkaConsumer(String streamName, long timestamp) throws
if (kafkaSchema == null) {
throw new Exception("Kafka Schema not Found for Stream: " + streamName);
}
kafkaConsumer = getConsumer(kafkaSchema);
kafkaConsumer = getConsumer(kafkaSchema, streamName);
TopicPartition topicPartition = new TopicPartition(streamName + ".stream",0);
kafkaConsumer.assign(Collections.singleton(topicPartition));

Expand Down Expand Up @@ -137,7 +142,7 @@ public KafkaConsumer getKafkaConsumer(String streamName, long timestamp) throws
*/


public KafkaAvroConsumer getConsumer(Schema avroSchema) throws Exception {
public KafkaAvroConsumer getConsumer(Schema avroSchema, String streamName) throws Exception {
try {
if(!IsItJunit.isJUnitTest()) {
ConfigProperties.resolveAndExportToSystemProperties(securityProps);
Expand All @@ -147,9 +152,9 @@ public KafkaAvroConsumer getConsumer(Schema avroSchema) throws Exception {
kafkaProps.put("key.deserializer", StringDeserializer.class.getName());
kafkaProps.put("value.deserializer", AvroDeserializer.class.getName());
if(!kafkaProps.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase());
}
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.clientID + "_" +getDate() + "_" + UUID.randomUUID().toString());
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.clientID + "_" + streamName + "_" + getDate());
ConfigProperties.resolve(kafkaProps);
return new KafkaAvroConsumer(kafkaProps, avroSchema);
}
Expand Down Expand Up @@ -201,8 +206,12 @@ public KafkaConsumer getNewsConsumer(String topic) throws Exception {
if (newsSchema == null) {
throw new Exception("News Schema not Found ");
}
kafkaConsumer = getConsumer(newsSchema);
kafkaConsumer.subscribe(Collections.singletonList(topic+".stream"));
kafkaConsumer = getConsumer(newsSchema, topic);
TopicPartition topicPartition = new TopicPartition(topic + ".stream",0);
kafkaConsumer.assign(Collections.singletonList(topicPartition));
if(kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).equals(OffsetResetStrategy.EARLIEST.toString().toLowerCase())) {
return seekToMidNight(topicPartition);
}
return kafkaConsumer;
}
catch (Exception e){
Expand All @@ -217,4 +226,32 @@ private String getDate(){
String date = dateformat.format(new Date());
return date;
}

private KafkaConsumer seekToMidNight(TopicPartition topicPartition){
Map<TopicPartition,Long> timestmaps = new HashMap();
timestmaps.put(topicPartition , getTodayMidNightTimeStamp());
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = kafkaConsumer.offsetsForTimes(timestmaps);
OffsetAndTimestamp offsetAndTimestamp = null;
if (offsetsForTimes != null && (offsetAndTimestamp = offsetsForTimes.get(topicPartition)) != null) {
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
} else {
kafkaConsumer.seekToBeginning(Collections.singleton(topicPartition));
}
return kafkaConsumer;
}

private long getTodayMidNightTimeStamp(){

TimeZone timeZone = TimeZone.getTimeZone("America/New_York");

Calendar today = Calendar.getInstance(timeZone);
today.set(Calendar.HOUR_OF_DAY, 0);
today.set(Calendar.MINUTE, 0);
today.set(Calendar.SECOND, 0);

long timestampFromMidnight = today.getTimeInMillis();

return timestampFromMidnight;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
import io.strimzi.kafka.oauth.common.ConfigProperties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
Expand All @@ -34,7 +32,6 @@ public ReadSchemaTopic(){

public Schema readSchema(String topic) throws Exception {
KafkaConsumer schemaConsumer= getConsumer("Control-"+getClientID(securityProps));
schemaConsumer.subscribe(Collections.singletonList(controlSchemaName));
Duration sec = Duration.ofSeconds(10);
Schema messageSchema = null;
ConsumerRecord<String,GenericRecord> lastRecord=null;
Expand Down Expand Up @@ -92,7 +89,6 @@ public Set<String> getTopics() throws Exception{
Set<String> topics = new HashSet<String>();

KafkaConsumer schemaConsumer= getConsumer("Control-"+getClientID(securityProps));
schemaConsumer.subscribe(Collections.singletonList(controlSchemaName));
Duration sec = Duration.ofSeconds(10);
while (true) {
ConsumerRecords<String, GenericRecord> schemaRecords = schemaConsumer.poll(sec);
Expand Down Expand Up @@ -130,34 +126,66 @@ private KafkaAvroConsumer getConsumer(String cleindId) throws Exception {
}

Schema.Parser parser = new Schema.Parser();
controlMessageSchema = parser.parse(ClassLoader.getSystemResourceAsStream("ControlMessageSchema.avsc"));
//controlMessageSchema = parser.parse(ClassLoader.getSystemResourceAsStream("ControlMessageSchema.avsc"));
controlMessageSchema = parser.parse(this.getClass().getResourceAsStream("/ControlMessageSchema.avsc"));

if (IsItJunit.isJUnitTest()) {
kafkaProps = KafkaConfigLoader.loadConfig();
}
kafkaProps.put("key.deserializer", StringSerializer.class.getName());
kafkaProps.put("value.deserializer", AvroDeserializer.class.getName());
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, cleindId + "_" + UUID.randomUUID().toString());
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase());
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, cleindId);
kafkaProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576");
ConfigProperties.resolve(kafkaProps);
}
catch (Exception e) {
throw e;
}
return new KafkaAvroConsumer(kafkaProps, controlMessageSchema);

KafkaAvroConsumer kafkaAvroConsumer = new KafkaAvroConsumer(kafkaProps, controlMessageSchema);
TopicPartition topicPartition = new TopicPartition(controlSchemaName,0);
kafkaAvroConsumer.assign(Collections.singletonList(topicPartition));
return seekTo7DaysBack(kafkaAvroConsumer, topicPartition);
}

private Schema internalSchema (String topic) throws Exception {
try {
final Schema topicSchema;
Schema.Parser parser = new Schema.Parser();
topicSchema = parser.parse(ClassLoader.getSystemResourceAsStream("schemas/" + topic + ".avsc"));
topicSchema = parser.parse(this.getClass().getResourceAsStream("schemas/" + topic + ".avsc"));
return topicSchema;
} catch (Exception e){
throw new Exception("SCHEMA NOT FOUND FOR TOPIC: "+ topic);
}
}

}
private KafkaAvroConsumer seekTo7DaysBack(KafkaAvroConsumer kafkaAvroConsumer, TopicPartition topicPartition){
Map<TopicPartition,Long> timestmaps = new HashMap();
timestmaps.put(topicPartition , getTodayMidNightTimeStamp());
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = kafkaAvroConsumer.offsetsForTimes(timestmaps);
OffsetAndTimestamp offsetAndTimestamp = null;
if (offsetsForTimes != null && (offsetAndTimestamp = offsetsForTimes.get(topicPartition)) != null) {
kafkaAvroConsumer.seek(topicPartition, offsetAndTimestamp.offset());
} else {
kafkaAvroConsumer.seekToBeginning(Collections.singleton(topicPartition));
}
return kafkaAvroConsumer;
}

private long getTodayMidNightTimeStamp(){

TimeZone timeZone = TimeZone.getTimeZone("America/New_York");

Calendar today = Calendar.getInstance(timeZone);
today.add(Calendar.DATE, -7);
today.set(Calendar.HOUR_OF_DAY, 0);
today.set(Calendar.MINUTE, 0);
today.set(Calendar.SECOND, 0);

long timestampFromMidnight = today.getTimeInMillis();

return timestampFromMidnight;
}

}
4 changes: 2 additions & 2 deletions ncdssdk-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.nasdaq.ncds</groupId>
<artifactId>ncds</artifactId>
<version>0.3.0</version>
<version>0.4.0</version>
</parent>

<artifactId>ncdssdk-client</artifactId>
Expand All @@ -19,7 +19,7 @@
<dependency>
<groupId>com.nasdaq.ncds</groupId>
<artifactId>ncds-sdk</artifactId>
<version>0.3.0</version>
<version>0.4.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.nasdaq.ncds</groupId>
<artifactId>ncds</artifactId>
<version>0.3.0</version>
<version>0.4.0</version>
<packaging>pom</packaging>
<name>Nasdaq Cloud Data Service </name>

Expand Down

0 comments on commit 02e7182

Please sign in to comment.