Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add backend for topic events view based on offset range #2551

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class TopicContentsController {
value =
"/getTopicContents/{bootstrapServers}/"
+ "{protocol}/{consumerGroupId}/{topicName}/{offsetPosition}/partitionId/{selectedPartitionId}/"
+ "selectedNumberOfOffsets/{selectedNumberOfOffsets}/{clusterIdentification}",
+ "selectedNumberOfOffsets/{selectedNumberOfOffsets}/{clusterIdentification}/rangeOffsets/{rangeOffsetsStart}/{rangeOffsetsEnd}",
Copy link
Contributor

@muralibasani muralibasani Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can offset position be custom or range or id from FE ? so we don't introduce this action in url ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I work on FE I will make appropriate changes so that the FE will give offsetPosition as the two existing ones we have (number and custom) plus it will also give range, if thats what the user selects. I preferred to keep this PR only for BE first, then I can work on FE and make another PR.

As far as changes in url, I was a little confused as to how to approach the problem. In current behavior the offsetPosition is used in two cases

  1. offsetPosition can be a number. in this case we ignore the selectedNumberOfOffsets and selectedPartitionId value in the url. Using the offsetPosition number we seek to beginning of topic and pull all data for all partitions upto the given offsetPosition number.
  2. offsetPosition is the string value custom in this case we use the provided values of selectedNumberOfOffsets and selectedPartitionId to only pull data from the provided partition upto the the number provided in selectedNumberOfOffsets

Therefore in my case I felt I had the following options

  1. Reuse the existing url and try to do the range, in this case I have to figure out how to reuse offsetPosition, selectedNumberOfOffsets and selectedPartitionId. If offsetPosition is used to provide the value range then that leaves me with the other two for start and end of range, therefore the data will be for ALL partitions.
  2. Make some changes to url to accommodate the range.

Would you prefer that we keep the existing url exactly same?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about offsetPosition = { number, custom, range }
Existing situation, if it's number, or custom, nothing changes in the url.

If it's range, we need partitionId, start and end offsets.
Looks good the way you have done it.
"/getTopicContents/{bootstrapServers}/"
+ "{protocol}/{consumerGroupId}/{topicName}/{offsetPosition}/partitionId/{selectedPartitionId}/"
+ "selectedNumberOfOffsets/{selectedNumberOfOffsets}/{clusterIdentification}/"
+ "rangeOffsets/{rangeOffsetsStart}/{rangeOffsetsEnd}",

method = RequestMethod.GET,
produces = {MediaType.APPLICATION_JSON_VALUE})
public ResponseEntity<Map<Long, String>> getTopicContents(
Expand All @@ -34,7 +34,9 @@ public ResponseEntity<Map<Long, String>> getTopicContents(
@PathVariable String offsetPosition,
@PathVariable Integer selectedPartitionId,
@PathVariable Integer selectedNumberOfOffsets,
@PathVariable String clusterIdentification) {
@PathVariable String clusterIdentification,
@PathVariable Integer rangeOffsetsStart,
@PathVariable Integer rangeOffsetsEnd) {
Map<Long, String> events =
topicContentsService.readEvents(
bootstrapServers,
Expand All @@ -44,6 +46,8 @@ public ResponseEntity<Map<Long, String>> getTopicContents(
offsetPosition,
selectedPartitionId,
selectedNumberOfOffsets,
rangeOffsetsStart,
rangeOffsetsEnd,
"OFFSET_ID",
clusterIdentification);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.aiven.klaw.clusterapi.models.enums;

import lombok.Getter;

@Getter
public enum TopicContentType {
CUSTOM("custom"),
RANGE("range");

private final String value;

TopicContentType(String value) {
this.value = value;
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.aiven.klaw.clusterapi.services;

import io.aiven.klaw.clusterapi.models.enums.TopicContentType;
import io.aiven.klaw.clusterapi.utils.ClusterApiUtils;
import java.time.Duration;
import java.util.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -17,7 +19,7 @@
@Service
public class TopicContentsService {

public static final String CUSTOM_OFFSET_SELECTION = "custom";
public static final int RANGE_MAX_RECORDS = 100;
public static final int NUMBER_OF_POLLS = 3;
final ClusterApiUtils clusterApiUtils;

Expand All @@ -39,12 +41,14 @@ public Map<Long, String> readEvents(
String offsetPosition,
Integer selectedPartitionId,
Integer selectedNumberOfOffsets,
Integer rangeOffsetsStart,
Integer rangeOffsetsEnd,
String readMessagesType,
String clusterIdentification) {
log.debug(
"readEvents bootStrapServers {}, protocol {}, consumerGroupId {},"
+ " topicName {}, offsetPosition {}, readMessagesType {} clusterIdentification {} selectedPartitionId {}"
+ " selectedNumberOfOffsets {}",
+ " selectedNumberOfOffsets {} rangeOffsetsStart {} rangeOffsetsEnd {}",
bootStrapServers,
protocol,
consumerGroupId,
Expand All @@ -53,11 +57,18 @@ public Map<Long, String> readEvents(
readMessagesType,
clusterIdentification,
selectedPartitionId,
selectedNumberOfOffsets);
selectedNumberOfOffsets,
rangeOffsetsStart,
rangeOffsetsEnd);

Map<Long, String> eventMap = new TreeMap<>();
KafkaConsumer<String, String> consumer;

if (offsetPosition.equals(TopicContentType.RANGE.getValue())
&& (rangeOffsetsStart < 0 || rangeOffsetsEnd < 0)) {
return eventMap;
}

if (consumerGroupId.equals("notdefined")) {
consumer =
getKafkaConsumer(
Expand All @@ -72,7 +83,8 @@ public Map<Long, String> readEvents(
Set<TopicPartition> topicPartitionsSet = consumer.assignment();

Set<TopicPartition> partitionsAssignment = new HashSet<>();
if (offsetPosition.equals(CUSTOM_OFFSET_SELECTION)) {
if (offsetPosition.equals(TopicContentType.CUSTOM.getValue())
|| offsetPosition.equals(TopicContentType.RANGE.getValue())) {
for (TopicPartition tp : topicPartitionsSet) {
if (tp.partition() == selectedPartitionId) {
partitionsAssignment = Collections.singleton(tp);
Expand All @@ -83,7 +95,10 @@ public Map<Long, String> readEvents(
partitionsAssignment = topicPartitionsSet;
}

if (partitionsAssignment.isEmpty()) {
if (partitionsAssignment.isEmpty()
|| (offsetPosition.equals(TopicContentType.RANGE.getValue())
&& rangeOffsetsStart > rangeOffsetsEnd)) {
consumer.close();
return eventMap;
}
consumer.seekToBeginning(partitionsAssignment);
Expand All @@ -93,8 +108,10 @@ public Map<Long, String> readEvents(
for (TopicPartition tp : partitionsAssignment) {
long beginningOffset = consumer.position(tp);
long endOffset = endOffsets.get(tp);
if (offsetPosition.equals(CUSTOM_OFFSET_SELECTION)) {
if (offsetPosition.equals(TopicContentType.CUSTOM.getValue())) {
newOffset = endOffset - selectedNumberOfOffsets;
} else if (offsetPosition.equals(TopicContentType.RANGE.getValue())) {
newOffset = rangeOffsetsStart;
} else {
newOffset = endOffset - Integer.parseInt(offsetPosition);
}
Expand All @@ -107,11 +124,19 @@ public Map<Long, String> readEvents(
}

int i = 0;
boolean exitLoop = false;
do {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(500));
consumerRecords.forEach(record -> eventMap.put(record.offset(), record.value()));
for (ConsumerRecord<String, String> record : consumerRecords) {
eventMap.put(record.offset(), record.value());
if (offsetPosition.equals(TopicContentType.RANGE.getValue())
&& (record.offset() >= rangeOffsetsEnd || eventMap.size() >= RANGE_MAX_RECORDS)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need range offsets start position ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the existing pattern and used the rangeOffsetsStart here as we seek to the start position here.

Inside the loop we check the rangeOffsetsEnd only if the selection is range this ensures that the existing default behavior is completely unchanged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

exitLoop = true;
break;
}
}
i++;
} while (i != NUMBER_OF_POLLS);
} while (i != NUMBER_OF_POLLS && !exitLoop);

consumer.commitAsync();
consumer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,18 @@
import io.aiven.klaw.clusterapi.models.enums.KafkaSupportedProtocol;
import io.aiven.klaw.clusterapi.models.enums.RequestOperationType;
import io.aiven.klaw.clusterapi.services.SchemaService;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import java.security.Key;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.crypto.spec.SecretKeySpec;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -64,7 +56,6 @@
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.tomcat.util.codec.binary.Base64;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
Expand Down Expand Up @@ -112,6 +103,8 @@ public class ClusterApiControllerIT {
@Value("${klaw.clusterapi.access.base64.secret}")
private String clusterAccessSecret;

private final UtilMethods utilMethods = new UtilMethods();

private static final String bootStrapServers = "localhost:9092";
private static final String bootStrapServersSsl = "localhost:9093";

Expand All @@ -131,7 +124,8 @@ public void getKafkaServerStatus() throws Exception {
.contentType(MediaType.APPLICATION_JSON)
.header(
AUTHORIZATION,
BEARER_PREFIX + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
BEARER_PREFIX
+ utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andReturn()
Expand All @@ -153,7 +147,8 @@ public void getKafkaServerStatusSSL() throws Exception {
.contentType(MediaType.APPLICATION_JSON)
.header(
AUTHORIZATION,
BEARER_PREFIX + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
BEARER_PREFIX
+ utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andReturn()
Expand Down Expand Up @@ -198,7 +193,8 @@ public void updateTopics() throws Exception {
.content(jsonReq)
.header(
AUTHORIZATION,
BEARER_PREFIX + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
BEARER_PREFIX
+ utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andReturn()
Expand Down Expand Up @@ -494,7 +490,8 @@ public void resetConsumerOffsetsToEarliest() throws Exception {
.content(jsonReq)
.header(
AUTHORIZATION,
BEARER_PREFIX + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
BEARER_PREFIX
+ utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andReturn()
Expand Down Expand Up @@ -564,7 +561,8 @@ public void resetConsumerOffsetsToLatest() throws Exception {
.content(jsonReq)
.header(
AUTHORIZATION,
BEARER_PREFIX + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
BEARER_PREFIX
+ utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andReturn()
Expand Down Expand Up @@ -634,7 +632,8 @@ public void resetConsumerOffsetsToLatestDontConsumeRecs() throws Exception {
.content(jsonReq)
.header(
AUTHORIZATION,
BEARER_PREFIX + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
BEARER_PREFIX
+ utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andReturn()
Expand Down Expand Up @@ -749,7 +748,8 @@ public void resetOffsetsNonExistingTopic() throws Exception {
.header(
AUTHORIZATION,
BEARER_PREFIX
+ generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
+ utilMethods.generateToken(
KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andReturn()
Expand All @@ -770,14 +770,15 @@ public void getTopicContents() throws Exception {
+ "SSL/undefined/testtopic/custom/partitionId/0/"
+ "selectedNumberOfOffsets/"
+ numberOfOffsetsToRead
+ "/DEV2";
+ "/DEV2/rangeOffsets/0/0";
MockHttpServletResponse response =
mvc.perform(
MockMvcRequestBuilders.get(url)
.contentType(MediaType.APPLICATION_JSON)
.header(
AUTHORIZATION,
BEARER_PREFIX + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
BEARER_PREFIX
+ utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andReturn()
Expand All @@ -802,14 +803,15 @@ public void getTopicContentsInvalidPartitionId() throws Exception {
+ "/"
+ "selectedNumberOfOffsets/"
+ numberOfOffsetsToRead
+ "/DEV2";
+ "/DEV2/rangeOffsets/0/0";
MockHttpServletResponse response =
mvc.perform(
MockMvcRequestBuilders.get(url)
.contentType(MediaType.APPLICATION_JSON)
.header(
AUTHORIZATION,
BEARER_PREFIX + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
BEARER_PREFIX
+ utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andReturn()
Expand Down Expand Up @@ -869,7 +871,8 @@ private MockHttpServletResponse executeCreateTopicRequest(String jsonReq, String
.content(jsonReq)
.header(
AUTHORIZATION,
BEARER_PREFIX + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
BEARER_PREFIX
+ utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L))
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andReturn()
Expand Down Expand Up @@ -901,23 +904,6 @@ private static ClusterTopicRequest updateTopicRequest(String topicName) {
.build();
}

private String generateToken(
String clusterApiUser, String clusterAccessSecret, long expirationTime) {
Key hmacKey =
new SecretKeySpec(
Base64.decodeBase64(clusterAccessSecret), SignatureAlgorithm.HS256.getJcaName());
Instant now = Instant.now();

return Jwts.builder()
.claim("name", clusterApiUser)
.subject(clusterApiUser)
.id(UUID.randomUUID().toString())
.issuedAt(Date.from(now))
.expiration(Date.from(now.plus(expirationTime, ChronoUnit.MINUTES)))
.signWith(hmacKey)
.compact();
}

private static Map<String, String> buildBrokerProperties() {
Map<String, String> brokerProperties = new HashMap<>();

Expand Down
Loading