-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: Add backend for topic events view based on offset range #2551
Changes from all commits
1b4238b
add2adc
a79418b
2bd174a
44d2450
e7f4c63
d385be8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package io.aiven.klaw.clusterapi.models.enums; | ||
|
||
import lombok.Getter; | ||
|
||
@Getter | ||
public enum TopicContentType { | ||
CUSTOM("custom"), | ||
RANGE("range"); | ||
|
||
private final String value; | ||
|
||
TopicContentType(String value) { | ||
this.value = value; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,13 @@ | ||
package io.aiven.klaw.clusterapi.services; | ||
|
||
import io.aiven.klaw.clusterapi.models.enums.TopicContentType; | ||
import io.aiven.klaw.clusterapi.utils.ClusterApiUtils; | ||
import java.time.Duration; | ||
import java.util.*; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.kafka.clients.CommonClientConfigs; | ||
import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
import org.apache.kafka.clients.consumer.ConsumerRecords; | ||
import org.apache.kafka.clients.consumer.KafkaConsumer; | ||
import org.apache.kafka.common.TopicPartition; | ||
|
@@ -17,7 +19,7 @@ | |
@Service | ||
public class TopicContentsService { | ||
|
||
public static final String CUSTOM_OFFSET_SELECTION = "custom"; | ||
public static final int RANGE_MAX_RECORDS = 100; | ||
public static final int NUMBER_OF_POLLS = 3; | ||
final ClusterApiUtils clusterApiUtils; | ||
|
||
|
@@ -39,12 +41,14 @@ public Map<Long, String> readEvents( | |
String offsetPosition, | ||
Integer selectedPartitionId, | ||
Integer selectedNumberOfOffsets, | ||
Integer rangeOffsetsStart, | ||
Integer rangeOffsetsEnd, | ||
String readMessagesType, | ||
String clusterIdentification) { | ||
log.debug( | ||
"readEvents bootStrapServers {}, protocol {}, consumerGroupId {}," | ||
+ " topicName {}, offsetPosition {}, readMessagesType {} clusterIdentification {} selectedPartitionId {}" | ||
+ " selectedNumberOfOffsets {}", | ||
+ " selectedNumberOfOffsets {} rangeOffsetsStart {} rangeOffsetsEnd {}", | ||
bootStrapServers, | ||
protocol, | ||
consumerGroupId, | ||
|
@@ -53,11 +57,18 @@ public Map<Long, String> readEvents( | |
readMessagesType, | ||
clusterIdentification, | ||
selectedPartitionId, | ||
selectedNumberOfOffsets); | ||
selectedNumberOfOffsets, | ||
rangeOffsetsStart, | ||
rangeOffsetsEnd); | ||
|
||
Map<Long, String> eventMap = new TreeMap<>(); | ||
KafkaConsumer<String, String> consumer; | ||
|
||
if (offsetPosition.equals(TopicContentType.RANGE.getValue()) | ||
&& (rangeOffsetsStart < 0 || rangeOffsetsEnd < 0)) { | ||
return eventMap; | ||
} | ||
|
||
if (consumerGroupId.equals("notdefined")) { | ||
consumer = | ||
getKafkaConsumer( | ||
|
@@ -72,7 +83,8 @@ public Map<Long, String> readEvents( | |
Set<TopicPartition> topicPartitionsSet = consumer.assignment(); | ||
|
||
Set<TopicPartition> partitionsAssignment = new HashSet<>(); | ||
if (offsetPosition.equals(CUSTOM_OFFSET_SELECTION)) { | ||
if (offsetPosition.equals(TopicContentType.CUSTOM.getValue()) | ||
|| offsetPosition.equals(TopicContentType.RANGE.getValue())) { | ||
for (TopicPartition tp : topicPartitionsSet) { | ||
if (tp.partition() == selectedPartitionId) { | ||
partitionsAssignment = Collections.singleton(tp); | ||
|
@@ -83,7 +95,10 @@ public Map<Long, String> readEvents( | |
partitionsAssignment = topicPartitionsSet; | ||
} | ||
|
||
if (partitionsAssignment.isEmpty()) { | ||
if (partitionsAssignment.isEmpty() | ||
|| (offsetPosition.equals(TopicContentType.RANGE.getValue()) | ||
&& rangeOffsetsStart > rangeOffsetsEnd)) { | ||
consumer.close(); | ||
return eventMap; | ||
} | ||
consumer.seekToBeginning(partitionsAssignment); | ||
|
@@ -93,8 +108,10 @@ public Map<Long, String> readEvents( | |
for (TopicPartition tp : partitionsAssignment) { | ||
long beginningOffset = consumer.position(tp); | ||
long endOffset = endOffsets.get(tp); | ||
if (offsetPosition.equals(CUSTOM_OFFSET_SELECTION)) { | ||
if (offsetPosition.equals(TopicContentType.CUSTOM.getValue())) { | ||
newOffset = endOffset - selectedNumberOfOffsets; | ||
} else if (offsetPosition.equals(TopicContentType.RANGE.getValue())) { | ||
newOffset = rangeOffsetsStart; | ||
} else { | ||
newOffset = endOffset - Integer.parseInt(offsetPosition); | ||
} | ||
|
@@ -107,11 +124,19 @@ public Map<Long, String> readEvents( | |
} | ||
|
||
int i = 0; | ||
boolean exitLoop = false; | ||
do { | ||
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(500)); | ||
consumerRecords.forEach(record -> eventMap.put(record.offset(), record.value())); | ||
for (ConsumerRecord<String, String> record : consumerRecords) { | ||
eventMap.put(record.offset(), record.value()); | ||
if (offsetPosition.equals(TopicContentType.RANGE.getValue()) | ||
&& (record.offset() >= rangeOffsetsEnd || eventMap.size() >= RANGE_MAX_RECORDS)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't we need range offsets start position ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks good |
||
exitLoop = true; | ||
break; | ||
} | ||
} | ||
i++; | ||
} while (i != NUMBER_OF_POLLS); | ||
} while (i != NUMBER_OF_POLLS && !exitLoop); | ||
|
||
consumer.commitAsync(); | ||
consumer.close(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can offset position be custom or range or id from FE ? so we don't introduce this action in url ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I work on FE I will make appropriate changes so that the FE will give
offsetPosition
as the two existing ones we have (number and custom) plus it will also give range, if thats what the user selects. I preferred to keep this PR only for BE first, then I can work on FE and make another PR.As far as changes in url, I was a little confused as to how to approach the problem. In current behavior the
offsetPosition
is used in two casesoffsetPosition
can be a number. in this case we ignore theselectedNumberOfOffsets
andselectedPartitionId
value in the url. Using theoffsetPosition
number we seek to beginning of topic and pull all data for all partitions upto the givenoffsetPosition
number.offsetPosition
is the string value custom in this case we use the provided values ofselectedNumberOfOffsets
andselectedPartitionId
to only pull data from the provided partition upto the the number provided inselectedNumberOfOffsets
Therefore in my case I felt I had the following options
offsetPosition
,selectedNumberOfOffsets
andselectedPartitionId
. IfoffsetPosition
is used to provide the value range then that leaves me with the other two for start and end of range, therefore the data will be for ALL partitions.Would you prefer that we keep the existing url exactly same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about offsetPosition = { number, custom, range }
Existing situation, if it's number, or custom, nothing changes in the url.
If it's range, we need partitionId, start and end offsets.
Looks good the way you have done it.
"/getTopicContents/{bootstrapServers}/"
+ "{protocol}/{consumerGroupId}/{topicName}/{offsetPosition}/partitionId/{selectedPartitionId}/"
+ "selectedNumberOfOffsets/{selectedNumberOfOffsets}/{clusterIdentification}/"
+ "rangeOffsets/{rangeOffsetsStart}/{rangeOffsetsEnd}",