From 1b4238b6fdd85f05b9d7c7f07e9a3717b5273c48 Mon Sep 17 00:00:00 2001 From: khatib tamal Date: Thu, 15 Aug 2024 19:41:30 -0400 Subject: [PATCH 1/6] Feature: Add backend for topic events view based on offset range Signed-off-by: khatib tamal --- .../controller/TopicContentsController.java | 8 +- .../services/TopicContentsService.java | 32 ++- .../clusterapi/ClusterApiControllerIT.java | 62 ++--- .../clusterapi/TopicContentsControllerIT.java | 257 ++++++++++++++++++ .../io/aiven/klaw/clusterapi/UtilMethods.java | 24 ++ .../services/TopicContentsServiceTest.java | 2 + .../aiven/klaw/service/ClusterApiService.java | 8 +- 7 files changed, 345 insertions(+), 48 deletions(-) create mode 100644 cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java diff --git a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/controller/TopicContentsController.java b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/controller/TopicContentsController.java index 04e3a3b9d2..3367b79d2a 100644 --- a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/controller/TopicContentsController.java +++ b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/controller/TopicContentsController.java @@ -23,7 +23,7 @@ public class TopicContentsController { value = "/getTopicContents/{bootstrapServers}/" + "{protocol}/{consumerGroupId}/{topicName}/{offsetPosition}/partitionId/{selectedPartitionId}/" - + "selectedNumberOfOffsets/{selectedNumberOfOffsets}/{clusterIdentification}", + + "selectedNumberOfOffsets/{selectedNumberOfOffsets}/{clusterIdentification}/rangeOffsets/{rangeOffsetsStart}/{rangeOffsetsEnd}", method = RequestMethod.GET, produces = {MediaType.APPLICATION_JSON_VALUE}) public ResponseEntity> getTopicContents( @@ -34,7 +34,9 @@ public ResponseEntity> getTopicContents( @PathVariable String offsetPosition, @PathVariable Integer selectedPartitionId, @PathVariable Integer selectedNumberOfOffsets, - @PathVariable String clusterIdentification) { + @PathVariable String clusterIdentification, + @PathVariable Integer rangeOffsetsStart, + @PathVariable Integer rangeOffsetsEnd) { Map events = topicContentsService.readEvents( bootstrapServers, @@ -44,6 +46,8 @@ public ResponseEntity> getTopicContents( offsetPosition, selectedPartitionId, selectedNumberOfOffsets, + rangeOffsetsStart, + rangeOffsetsEnd, "OFFSET_ID", clusterIdentification); diff --git a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/TopicContentsService.java b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/TopicContentsService.java index 8c4ec20530..8ec947b479 100644 --- a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/TopicContentsService.java +++ b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/TopicContentsService.java @@ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; @@ -18,6 +19,8 @@ public class TopicContentsService { public static final String CUSTOM_OFFSET_SELECTION = "custom"; + public static final String RANGE_OFFSET_SELECTION = "range"; + public static final int RANGE_MAX_RECORDS = 100; public static final int NUMBER_OF_POLLS = 3; final ClusterApiUtils clusterApiUtils; @@ -39,12 +42,14 @@ public Map readEvents( String offsetPosition, Integer selectedPartitionId, Integer selectedNumberOfOffsets, + Integer rangeOffsetsStart, + Integer rangeOffsetsEnd, String readMessagesType, String clusterIdentification) { log.debug( "readEvents bootStrapServers {}, protocol {}, consumerGroupId {}," + " topicName {}, offsetPosition {}, readMessagesType {} clusterIdentification {} selectedPartitionId {}" - + " selectedNumberOfOffsets {}", + + " selectedNumberOfOffsets {} rangeOffsetsStart {} rangeOffsetsEnd {}", bootStrapServers, protocol, consumerGroupId, @@ -53,7 +58,9 @@ public Map readEvents( readMessagesType, clusterIdentification, selectedPartitionId, - selectedNumberOfOffsets); + selectedNumberOfOffsets, + rangeOffsetsStart, + rangeOffsetsEnd); Map eventMap = new TreeMap<>(); KafkaConsumer consumer; @@ -72,7 +79,8 @@ public Map readEvents( Set topicPartitionsSet = consumer.assignment(); Set partitionsAssignment = new HashSet<>(); - if (offsetPosition.equals(CUSTOM_OFFSET_SELECTION)) { + if (offsetPosition.equals(CUSTOM_OFFSET_SELECTION) + || offsetPosition.equals(RANGE_OFFSET_SELECTION)) { for (TopicPartition tp : topicPartitionsSet) { if (tp.partition() == selectedPartitionId) { partitionsAssignment = Collections.singleton(tp); @@ -83,7 +91,9 @@ public Map readEvents( partitionsAssignment = topicPartitionsSet; } - if (partitionsAssignment.isEmpty()) { + if (partitionsAssignment.isEmpty() + || (offsetPosition.equals(RANGE_OFFSET_SELECTION) && rangeOffsetsStart > rangeOffsetsEnd)) { + consumer.close(); return eventMap; } consumer.seekToBeginning(partitionsAssignment); @@ -95,6 +105,8 @@ public Map readEvents( long endOffset = endOffsets.get(tp); if (offsetPosition.equals(CUSTOM_OFFSET_SELECTION)) { newOffset = endOffset - selectedNumberOfOffsets; + } else if (offsetPosition.equals(RANGE_OFFSET_SELECTION)) { + newOffset = rangeOffsetsStart; } else { newOffset = endOffset - Integer.parseInt(offsetPosition); } @@ -107,11 +119,19 @@ public Map readEvents( } int i = 0; + boolean exitLoop = false; do { ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(500)); - consumerRecords.forEach(record -> eventMap.put(record.offset(), record.value())); + for (ConsumerRecord record : consumerRecords) { + eventMap.put(record.offset(), record.value()); + if (offsetPosition.equals(RANGE_OFFSET_SELECTION) + && (record.offset() >= rangeOffsetsEnd || eventMap.size() >= RANGE_MAX_RECORDS)) { + exitLoop = true; + break; + } + } i++; - } while (i != NUMBER_OF_POLLS); + } while (i != NUMBER_OF_POLLS && !exitLoop); consumer.commitAsync(); consumer.close(); diff --git a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/ClusterApiControllerIT.java b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/ClusterApiControllerIT.java index b5f6d8312d..74b32982c2 100644 --- a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/ClusterApiControllerIT.java +++ b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/ClusterApiControllerIT.java @@ -22,26 +22,18 @@ import io.aiven.klaw.clusterapi.models.enums.KafkaSupportedProtocol; import io.aiven.klaw.clusterapi.models.enums.RequestOperationType; import io.aiven.klaw.clusterapi.services.SchemaService; -import io.jsonwebtoken.Jwts; -import io.jsonwebtoken.SignatureAlgorithm; -import java.security.Key; -import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.TreeMap; -import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import javax.crypto.spec.SecretKeySpec; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -64,7 +56,6 @@ import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePatternFilter; import org.apache.kafka.common.resource.ResourceType; -import org.apache.tomcat.util.codec.binary.Base64; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; @@ -112,6 +103,8 @@ public class ClusterApiControllerIT { @Value("${klaw.clusterapi.access.base64.secret}") private String clusterAccessSecret; + private final UtilMethods utilMethods = new UtilMethods(); + private static final String bootStrapServers = "localhost:9092"; private static final String bootStrapServersSsl = "localhost:9093"; @@ -131,7 +124,8 @@ public void getKafkaServerStatus() throws Exception { .contentType(MediaType.APPLICATION_JSON) .header( AUTHORIZATION, - BEARER_PREFIX + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) + BEARER_PREFIX + + utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) .accept(MediaType.APPLICATION_JSON)) .andExpect(status().isOk()) .andReturn() @@ -153,7 +147,8 @@ public void getKafkaServerStatusSSL() throws Exception { .contentType(MediaType.APPLICATION_JSON) .header( AUTHORIZATION, - BEARER_PREFIX + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) + BEARER_PREFIX + + utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) .accept(MediaType.APPLICATION_JSON)) .andExpect(status().isOk()) .andReturn() @@ -198,7 +193,8 @@ public void updateTopics() throws Exception { .content(jsonReq) .header( AUTHORIZATION, - BEARER_PREFIX + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) + BEARER_PREFIX + + utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) .accept(MediaType.APPLICATION_JSON)) .andExpect(status().isOk()) .andReturn() @@ -494,7 +490,8 @@ public void resetConsumerOffsetsToEarliest() throws Exception { .content(jsonReq) .header( AUTHORIZATION, - BEARER_PREFIX + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) + BEARER_PREFIX + + utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) .accept(MediaType.APPLICATION_JSON)) .andExpect(status().isOk()) .andReturn() @@ -564,7 +561,8 @@ public void resetConsumerOffsetsToLatest() throws Exception { .content(jsonReq) .header( AUTHORIZATION, - BEARER_PREFIX + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) + BEARER_PREFIX + + utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) .accept(MediaType.APPLICATION_JSON)) .andExpect(status().isOk()) .andReturn() @@ -634,7 +632,8 @@ public void resetConsumerOffsetsToLatestDontConsumeRecs() throws Exception { .content(jsonReq) .header( AUTHORIZATION, - BEARER_PREFIX + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) + BEARER_PREFIX + + utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) .accept(MediaType.APPLICATION_JSON)) .andExpect(status().isOk()) .andReturn() @@ -749,7 +748,8 @@ public void resetOffsetsNonExistingTopic() throws Exception { .header( AUTHORIZATION, BEARER_PREFIX - + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) + + utilMethods.generateToken( + KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) .accept(MediaType.APPLICATION_JSON)) .andExpect(status().isOk()) .andReturn() @@ -770,14 +770,15 @@ public void getTopicContents() throws Exception { + "SSL/undefined/testtopic/custom/partitionId/0/" + "selectedNumberOfOffsets/" + numberOfOffsetsToRead - + "/DEV2"; + + "/DEV2/rangeOffsets/0/0"; MockHttpServletResponse response = mvc.perform( MockMvcRequestBuilders.get(url) .contentType(MediaType.APPLICATION_JSON) .header( AUTHORIZATION, - BEARER_PREFIX + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) + BEARER_PREFIX + + utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) .accept(MediaType.APPLICATION_JSON)) .andExpect(status().isOk()) .andReturn() @@ -802,14 +803,15 @@ public void getTopicContentsInvalidPartitionId() throws Exception { + "/" + "selectedNumberOfOffsets/" + numberOfOffsetsToRead - + "/DEV2"; + + "/DEV2/rangeOffsets/0/0"; MockHttpServletResponse response = mvc.perform( MockMvcRequestBuilders.get(url) .contentType(MediaType.APPLICATION_JSON) .header( AUTHORIZATION, - BEARER_PREFIX + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) + BEARER_PREFIX + + utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) .accept(MediaType.APPLICATION_JSON)) .andExpect(status().isOk()) .andReturn() @@ -869,7 +871,8 @@ private MockHttpServletResponse executeCreateTopicRequest(String jsonReq, String .content(jsonReq) .header( AUTHORIZATION, - BEARER_PREFIX + generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) + BEARER_PREFIX + + utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) .accept(MediaType.APPLICATION_JSON)) .andExpect(status().isOk()) .andReturn() @@ -901,23 +904,6 @@ private static ClusterTopicRequest updateTopicRequest(String topicName) { .build(); } - private String generateToken( - String clusterApiUser, String clusterAccessSecret, long expirationTime) { - Key hmacKey = - new SecretKeySpec( - Base64.decodeBase64(clusterAccessSecret), SignatureAlgorithm.HS256.getJcaName()); - Instant now = Instant.now(); - - return Jwts.builder() - .claim("name", clusterApiUser) - .subject(clusterApiUser) - .id(UUID.randomUUID().toString()) - .issuedAt(Date.from(now)) - .expiration(Date.from(now.plus(expirationTime, ChronoUnit.MINUTES))) - .signWith(hmacKey) - .compact(); - } - private static Map buildBrokerProperties() { Map brokerProperties = new HashMap<>(); diff --git a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java new file mode 100644 index 0000000000..c8a540b891 --- /dev/null +++ b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java @@ -0,0 +1,257 @@ +package io.aiven.klaw.clusterapi; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.MapType; +import com.fasterxml.jackson.databind.type.TypeFactory; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.http.MediaType; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.mock.web.MockHttpServletResponse; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; + +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + classes = KafkaClusterApiApplication.class) +@AutoConfigureMockMvc +@TestPropertySource(locations = "classpath:application.properties") +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@DirtiesContext +@EmbeddedKafka( + brokerProperties = {"listeners=PLAINTEXT://" + TopicContentsControllerIT.BOOTSTRAP_SERVER}, + partitions = 1, + topics = {TopicContentsControllerIT.TEST_TOPIC_NAME}) +@Slf4j +public class TopicContentsControllerIT { + + public static final String CUSTOM_SELECTION = "custom"; + public static final String RANGE_SELECTION = "range"; + public static final String BOOTSTRAP_SERVER = "localhost:9092"; + public static final String TEST_TOPIC_NAME = "test-topic"; + public static final int TOTAL_TEST_RECORDS = 10; + public static final String KWCLUSTERAPIUSER = "kwclusterapiuser"; + public static final String AUTHORIZATION = "Authorization"; + public static final String BEARER_PREFIX = "Bearer "; + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Autowired private MockMvc mvc; + + @Value("${klaw.clusterapi.access.base64.secret}") + private String clusterAccessSecret; + + private static int totalRecordsProduced = 0; + private final UtilMethods utilMethods = new UtilMethods(); + + @Test + @Order(1) + void getTopicContentsWhenSelectedNumberOfOffsetsLessThanTotalRecords() throws Exception { + produceRecords(TOTAL_TEST_RECORDS); + + int totalOffsets = 5; + String url = + createUrl( + String.valueOf(totalOffsets), Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE); + + Map response = getTopicContentsPerformMockRequest(url); + + getTopicContentsVerifyResponse(response, totalOffsets, totalOffsets, TOTAL_TEST_RECORDS - 1); + } + + @Test + @Order(2) + void getTopicContentsWhenSelectedNumberOfOffsetsMoreThanTotalRecords() throws Exception { + int totalOffsets = TOTAL_TEST_RECORDS + 10; + String url = + createUrl( + String.valueOf(totalOffsets), Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE); + + Map response = getTopicContentsPerformMockRequest(url); + + getTopicContentsVerifyResponse(response, TOTAL_TEST_RECORDS, 0, TOTAL_TEST_RECORDS - 1); + } + + @Test + @Order(3) + void getTopicContentsWhenSelectedNumberOfOffsetsIsNegative() throws Exception { + String url = + createUrl(String.valueOf(-1), Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE); + + Map response = getTopicContentsPerformMockRequest(url); + + assertThat(response).isEmpty(); + } + + @Test + @Order(4) + void getTopicContentsWhenCustomAndOffsetsLessThanTotalRecords() throws Exception { + int totalOffsets = 5; + String url = createUrl(CUSTOM_SELECTION, totalOffsets, Integer.MAX_VALUE, Integer.MAX_VALUE); + + Map response = getTopicContentsPerformMockRequest(url); + + getTopicContentsVerifyResponse(response, totalOffsets, totalOffsets, TOTAL_TEST_RECORDS - 1); + } + + @Test + @Order(5) + void getTopicContentsWhenCustomAndOffsetsMoreThanTotalRecords() throws Exception { + int totalOffsets = TOTAL_TEST_RECORDS + 10; + String url = createUrl(CUSTOM_SELECTION, totalOffsets, Integer.MAX_VALUE, Integer.MAX_VALUE); + + Map response = getTopicContentsPerformMockRequest(url); + + getTopicContentsVerifyResponse(response, TOTAL_TEST_RECORDS, 0, TOTAL_TEST_RECORDS - 1); + } + + @Test + @Order(6) + void getTopicContentsWhenCustomAndOffsetAndOffsetIsNegative() throws Exception { + String url = createUrl(CUSTOM_SELECTION, -1, Integer.MAX_VALUE, Integer.MAX_VALUE); + + Map response = getTopicContentsPerformMockRequest(url); + + assertThat(response).isEmpty(); + } + + @Test + @Order(7) + void getTopicContentsWhenRangeAndValidBounds() throws Exception { + int start = 4; + int end = 7; + String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, start, end); + + Map response = getTopicContentsPerformMockRequest(url); + + getTopicContentsVerifyResponse(response, 4, start, end); + } + + @Test + @Order(8) + void getTopicContentsWhenRangeAndInvalidLowerBound() throws Exception { + int end = 7; + String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, Integer.MIN_VALUE, end); + + Map response = getTopicContentsPerformMockRequest(url); + + getTopicContentsVerifyResponse(response, 8, 0, end); + } + + @Test + @Order(9) + void getTopicContentsWhenRangeAndInvalidUpperBound() throws Exception { + int start = 4; + String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, start, Integer.MAX_VALUE); + + Map response = getTopicContentsPerformMockRequest(url); + + getTopicContentsVerifyResponse(response, 6, start, 9); + } + + @Test + @Order(10) + void getTopicContentsWhenRangeAndInvalidUpperLowerBounds() throws Exception { + String url = + createUrl(RANGE_SELECTION, Integer.MAX_VALUE, Integer.MIN_VALUE, Integer.MAX_VALUE); + + Map response = getTopicContentsPerformMockRequest(url); + + getTopicContentsVerifyResponse(response, TOTAL_TEST_RECORDS, 0, TOTAL_TEST_RECORDS - 1); + } + + @Test + @Order(11) + void getTopicContentsWhenRangeAndLowerBoundBiggerThanUpperBound() throws Exception { + String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, 6, 3); + + Map response = getTopicContentsPerformMockRequest(url); + + assertThat(response).isEmpty(); + } + + @Test + @Order(12) + void getTopicContentsWhenRangeAndTotalOffsetsLargerThanMax() throws Exception { + produceRecords(200); + String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, 20, 180); + + Map response = getTopicContentsPerformMockRequest(url); + + getTopicContentsVerifyResponse(response, 100, 20, 119); + } + + private String createUrl( + String offsetType, int customNumberOfOffsets, int rangeStart, int rangeEnd) { + return String.format( + "/topics/getTopicContents/%s/PLAINTEXT/notdefined/test-topic/%s/partitionId/0/selectedNumberOfOffsets/%d/DEV_CLUSTER1/rangeOffsets/%d/%d", + BOOTSTRAP_SERVER, offsetType, customNumberOfOffsets, rangeStart, rangeEnd); + } + + private void produceRecords(int number) { + Properties properties = new Properties(); + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + properties.setProperty( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.setProperty( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + KafkaProducer kafkaProducer = new KafkaProducer<>(properties); + + ProducerRecord producerRecord; + for (int x = 0; x < number; x++) { + producerRecord = + new ProducerRecord<>("test-topic", String.format("value%d", totalRecordsProduced++)); + kafkaProducer.send(producerRecord); + } + + kafkaProducer.flush(); + kafkaProducer.close(); + } + + private Map getTopicContentsPerformMockRequest(String url) throws Exception { + MockHttpServletResponse response = + mvc.perform( + MockMvcRequestBuilders.get(url) + .contentType(MediaType.APPLICATION_JSON) + .header( + AUTHORIZATION, + BEARER_PREFIX + + utilMethods.generateToken(KWCLUSTERAPIUSER, clusterAccessSecret, 3L)) + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andReturn() + .getResponse(); + + MapType mapType = + TypeFactory.defaultInstance().constructMapType(HashMap.class, Integer.class, String.class); + return OBJECT_MAPPER.readValue(response.getContentAsString(), mapType); + } + + private void getTopicContentsVerifyResponse( + Map response, int responseSize, int offsetIdStart, int offsetIdEnd) { + assertThat(response.size()).isEqualTo(responseSize); + for (int x = offsetIdStart; x <= offsetIdEnd; x++) { + assertThat(response.containsKey(x)).isTrue(); + assertThat(response.get(x)).isEqualTo(String.format("value%d", x)); + } + } +} diff --git a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/UtilMethods.java b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/UtilMethods.java index 77dda31765..debcf047d8 100644 --- a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/UtilMethods.java +++ b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/UtilMethods.java @@ -22,12 +22,19 @@ import io.aiven.klaw.clusterapi.models.enums.KafkaSupportedProtocol; import io.aiven.klaw.clusterapi.models.enums.RequestOperationType; import io.aiven.klaw.clusterapi.models.enums.SchemaType; +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.SignatureAlgorithm; +import java.security.Key; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.*; +import javax.crypto.spec.SecretKeySpec; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourceType; +import org.apache.tomcat.util.codec.binary.Base64; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -339,4 +346,21 @@ public Map> getConnectorsListMap() { responseMap.put("conn2", statusMap); return responseMap; } + + public String generateToken( + String clusterApiUser, String clusterAccessSecret, long expirationTime) { + Key hmacKey = + new SecretKeySpec( + Base64.decodeBase64(clusterAccessSecret), SignatureAlgorithm.HS256.getJcaName()); + Instant now = Instant.now(); + + return Jwts.builder() + .claim("name", clusterApiUser) + .subject(clusterApiUser) + .id(UUID.randomUUID().toString()) + .issuedAt(Date.from(now)) + .expiration(Date.from(now.plus(expirationTime, ChronoUnit.MINUTES))) + .signWith(hmacKey) + .compact(); + } } diff --git a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/services/TopicContentsServiceTest.java b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/services/TopicContentsServiceTest.java index ec746bb42a..3ec62428d8 100644 --- a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/services/TopicContentsServiceTest.java +++ b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/services/TopicContentsServiceTest.java @@ -42,6 +42,8 @@ void readEvents() { offsetPosition + "", 0, 0, + 0, + 0, readMessagesType, TestConstants.CLUSTER_IDENTIFICATION); diff --git a/core/src/main/java/io/aiven/klaw/service/ClusterApiService.java b/core/src/main/java/io/aiven/klaw/service/ClusterApiService.java index b21cf006bc..a83604de1c 100644 --- a/core/src/main/java/io/aiven/klaw/service/ClusterApiService.java +++ b/core/src/main/java/io/aiven/klaw/service/ClusterApiService.java @@ -104,6 +104,7 @@ public class ClusterApiService { public static final String TOPICS_NATIVE_TYPE = "topicsNativeType"; public static final String RESET_CACHE = "resetCache"; public static final String PARTITION_ID = "partitionId"; + public static final String RANGE_OFFSETS = "rangeOffsets"; public static final String SELECTED_NUMBER_OF_OFFSETS = "selectedNumberOfOffsets"; @Autowired private ManageDatabase manageDatabase; @@ -279,7 +280,10 @@ public Map getTopicEvents( String.valueOf(selectedPartitionId), SELECTED_NUMBER_OF_OFFSETS, String.valueOf(selectedNumberOfOffsets), - clusterIdentification); + clusterIdentification, + RANGE_OFFSETS, + String.valueOf(Integer.MAX_VALUE), + String.valueOf(Integer.MAX_VALUE)); ResponseEntity> resultBody = getRestTemplate(null) @@ -1269,7 +1273,7 @@ private String generateToken(String username) throws KlawException { .subject(username) .id(UUID.randomUUID().toString()) .issuedAt(Date.from(now)) - .expiration(Date.from(now.plus(3L, ChronoUnit.MINUTES))) // expiry in 3 minutes + .expiration(Date.from(now.plus(300L, ChronoUnit.MINUTES))) // expiry in 3 minutes .signWith(hmacKey) .compact(); } From add2adc6f968276868ef8d6ab9cdaf459fe8bf8a Mon Sep 17 00:00:00 2001 From: khatib tamal Date: Thu, 15 Aug 2024 20:41:12 -0400 Subject: [PATCH 2/6] revert accidental change Signed-off-by: khatib tamal --- core/src/main/java/io/aiven/klaw/service/ClusterApiService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/io/aiven/klaw/service/ClusterApiService.java b/core/src/main/java/io/aiven/klaw/service/ClusterApiService.java index a83604de1c..46edd083de 100644 --- a/core/src/main/java/io/aiven/klaw/service/ClusterApiService.java +++ b/core/src/main/java/io/aiven/klaw/service/ClusterApiService.java @@ -1273,7 +1273,7 @@ private String generateToken(String username) throws KlawException { .subject(username) .id(UUID.randomUUID().toString()) .issuedAt(Date.from(now)) - .expiration(Date.from(now.plus(300L, ChronoUnit.MINUTES))) // expiry in 3 minutes + .expiration(Date.from(now.plus(3L, ChronoUnit.MINUTES))) // expiry in 3 minutes .signWith(hmacKey) .compact(); } From a79418bc76c57ef5019753a9bf6e1a47e4b2725f Mon Sep 17 00:00:00 2001 From: khatib tamal Date: Thu, 15 Aug 2024 20:52:58 -0400 Subject: [PATCH 3/6] removed slf4j from TopicContentsControllerIT Signed-off-by: khatib tamal --- .../io/aiven/klaw/clusterapi/TopicContentsControllerIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java index c8a540b891..a06d82a3c0 100644 --- a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java +++ b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java @@ -9,7 +9,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; -import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -41,7 +40,6 @@ brokerProperties = {"listeners=PLAINTEXT://" + TopicContentsControllerIT.BOOTSTRAP_SERVER}, partitions = 1, topics = {TopicContentsControllerIT.TEST_TOPIC_NAME}) -@Slf4j public class TopicContentsControllerIT { public static final String CUSTOM_SELECTION = "custom"; From 2bd174a9d5c62bfcd5175e7730f12f9d3cdac9f2 Mon Sep 17 00:00:00 2001 From: khatib tamal Date: Wed, 21 Aug 2024 01:01:45 -0400 Subject: [PATCH 4/6] code review changes Signed-off-by: khatib tamal --- .../models/enums/TopicContentType.java | 17 ++++++++++ .../services/TopicContentsService.java | 19 ++++++----- .../clusterapi/TopicContentsControllerIT.java | 32 +++++++++++++------ .../aiven/klaw/service/ClusterApiService.java | 4 +-- 4 files changed, 52 insertions(+), 20 deletions(-) create mode 100644 cluster-api/src/main/java/io/aiven/klaw/clusterapi/models/enums/TopicContentType.java diff --git a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/models/enums/TopicContentType.java b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/models/enums/TopicContentType.java new file mode 100644 index 0000000000..6c33b747d7 --- /dev/null +++ b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/models/enums/TopicContentType.java @@ -0,0 +1,17 @@ +package io.aiven.klaw.clusterapi.models.enums; + +import lombok.Getter; + +@Getter +public enum TopicContentType { + + CUSTOM("custom"), + RANGE("range"); + + private final String value; + + TopicContentType(String value) { + this.value = value; + } + +} diff --git a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/TopicContentsService.java b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/TopicContentsService.java index 8ec947b479..07774e945b 100644 --- a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/TopicContentsService.java +++ b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/TopicContentsService.java @@ -1,5 +1,6 @@ package io.aiven.klaw.clusterapi.services; +import io.aiven.klaw.clusterapi.models.enums.TopicContentType; import io.aiven.klaw.clusterapi.utils.ClusterApiUtils; import java.time.Duration; import java.util.*; @@ -18,8 +19,6 @@ @Service public class TopicContentsService { - public static final String CUSTOM_OFFSET_SELECTION = "custom"; - public static final String RANGE_OFFSET_SELECTION = "range"; public static final int RANGE_MAX_RECORDS = 100; public static final int NUMBER_OF_POLLS = 3; final ClusterApiUtils clusterApiUtils; @@ -65,6 +64,10 @@ public Map readEvents( Map eventMap = new TreeMap<>(); KafkaConsumer consumer; + if (offsetPosition.equals(TopicContentType.RANGE.getValue()) && (rangeOffsetsStart < 0 || rangeOffsetsEnd < 0)) { + return eventMap; + } + if (consumerGroupId.equals("notdefined")) { consumer = getKafkaConsumer( @@ -79,8 +82,8 @@ public Map readEvents( Set topicPartitionsSet = consumer.assignment(); Set partitionsAssignment = new HashSet<>(); - if (offsetPosition.equals(CUSTOM_OFFSET_SELECTION) - || offsetPosition.equals(RANGE_OFFSET_SELECTION)) { + if (offsetPosition.equals(TopicContentType.CUSTOM.getValue()) + || offsetPosition.equals(TopicContentType.RANGE.getValue())) { for (TopicPartition tp : topicPartitionsSet) { if (tp.partition() == selectedPartitionId) { partitionsAssignment = Collections.singleton(tp); @@ -92,7 +95,7 @@ public Map readEvents( } if (partitionsAssignment.isEmpty() - || (offsetPosition.equals(RANGE_OFFSET_SELECTION) && rangeOffsetsStart > rangeOffsetsEnd)) { + || (offsetPosition.equals(TopicContentType.RANGE.getValue()) && rangeOffsetsStart > rangeOffsetsEnd)) { consumer.close(); return eventMap; } @@ -103,9 +106,9 @@ public Map readEvents( for (TopicPartition tp : partitionsAssignment) { long beginningOffset = consumer.position(tp); long endOffset = endOffsets.get(tp); - if (offsetPosition.equals(CUSTOM_OFFSET_SELECTION)) { + if (offsetPosition.equals(TopicContentType.CUSTOM.getValue())) { newOffset = endOffset - selectedNumberOfOffsets; - } else if (offsetPosition.equals(RANGE_OFFSET_SELECTION)) { + } else if (offsetPosition.equals(TopicContentType.RANGE.getValue())) { newOffset = rangeOffsetsStart; } else { newOffset = endOffset - Integer.parseInt(offsetPosition); @@ -124,7 +127,7 @@ public Map readEvents( ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord record : consumerRecords) { eventMap.put(record.offset(), record.value()); - if (offsetPosition.equals(RANGE_OFFSET_SELECTION) + if (offsetPosition.equals(TopicContentType.RANGE.getValue()) && (record.offset() >= rangeOffsetsEnd || eventMap.size() >= RANGE_MAX_RECORDS)) { exitLoop = true; break; diff --git a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java index a06d82a3c0..bca83ee877 100644 --- a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java +++ b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java @@ -145,40 +145,40 @@ void getTopicContentsWhenRangeAndValidBounds() throws Exception { @Test @Order(8) - void getTopicContentsWhenRangeAndInvalidLowerBound() throws Exception { + void getTopicContentsWhenRangeAndStartOffsetNegative() throws Exception { int end = 7; - String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, Integer.MIN_VALUE, end); + String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, -1, end); Map response = getTopicContentsPerformMockRequest(url); - getTopicContentsVerifyResponse(response, 8, 0, end); + assertThat(response).isEmpty(); } @Test @Order(9) - void getTopicContentsWhenRangeAndInvalidUpperBound() throws Exception { + void getTopicContentsWhenRangeAndEndOffsetNegative() throws Exception { int start = 4; - String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, start, Integer.MAX_VALUE); + String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, start, -1); Map response = getTopicContentsPerformMockRequest(url); - getTopicContentsVerifyResponse(response, 6, start, 9); + assertThat(response).isEmpty(); } @Test @Order(10) - void getTopicContentsWhenRangeAndInvalidUpperLowerBounds() throws Exception { + void getTopicContentsWhenRangeAndStartAndEndOffsetNegative() throws Exception { String url = - createUrl(RANGE_SELECTION, Integer.MAX_VALUE, Integer.MIN_VALUE, Integer.MAX_VALUE); + createUrl(RANGE_SELECTION, Integer.MAX_VALUE, -1, -1); Map response = getTopicContentsPerformMockRequest(url); - getTopicContentsVerifyResponse(response, TOTAL_TEST_RECORDS, 0, TOTAL_TEST_RECORDS - 1); + assertThat(response).isEmpty(); } @Test @Order(11) - void getTopicContentsWhenRangeAndLowerBoundBiggerThanUpperBound() throws Exception { + void getTopicContentsWhenRangeAndStartOffsetBiggerThanEndOffset() throws Exception { String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, 6, 3); Map response = getTopicContentsPerformMockRequest(url); @@ -188,6 +188,18 @@ void getTopicContentsWhenRangeAndLowerBoundBiggerThanUpperBound() throws Excepti @Test @Order(12) + void getTopicContentsWhenRangeAndStartAndEndOffsetEqual() throws Exception { + int start = 5; + String url = + createUrl(RANGE_SELECTION, Integer.MAX_VALUE, start, start); + + Map response = getTopicContentsPerformMockRequest(url); + + getTopicContentsVerifyResponse(response, 1, start, start); + } + + @Test + @Order(13) void getTopicContentsWhenRangeAndTotalOffsetsLargerThanMax() throws Exception { produceRecords(200); String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, 20, 180); diff --git a/core/src/main/java/io/aiven/klaw/service/ClusterApiService.java b/core/src/main/java/io/aiven/klaw/service/ClusterApiService.java index 46edd083de..aff6c73229 100644 --- a/core/src/main/java/io/aiven/klaw/service/ClusterApiService.java +++ b/core/src/main/java/io/aiven/klaw/service/ClusterApiService.java @@ -282,8 +282,8 @@ public Map getTopicEvents( String.valueOf(selectedNumberOfOffsets), clusterIdentification, RANGE_OFFSETS, - String.valueOf(Integer.MAX_VALUE), - String.valueOf(Integer.MAX_VALUE)); + String.valueOf(-1), + String.valueOf(-1)); ResponseEntity> resultBody = getRestTemplate(null) From 44d2450c4ac3061475414e680c7dcc9341e9102e Mon Sep 17 00:00:00 2001 From: khatib tamal Date: Wed, 21 Aug 2024 01:06:54 -0400 Subject: [PATCH 5/6] ran mvn spotless Signed-off-by: khatib tamal --- .../clusterapi/models/enums/TopicContentType.java | 14 ++++++-------- .../clusterapi/services/TopicContentsService.java | 6 ++++-- .../klaw/clusterapi/TopicContentsControllerIT.java | 6 ++---- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/models/enums/TopicContentType.java b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/models/enums/TopicContentType.java index 6c33b747d7..fab09544b0 100644 --- a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/models/enums/TopicContentType.java +++ b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/models/enums/TopicContentType.java @@ -4,14 +4,12 @@ @Getter public enum TopicContentType { + CUSTOM("custom"), + RANGE("range"); - CUSTOM("custom"), - RANGE("range"); - - private final String value; - - TopicContentType(String value) { - this.value = value; - } + private final String value; + TopicContentType(String value) { + this.value = value; + } } diff --git a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/TopicContentsService.java b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/TopicContentsService.java index 07774e945b..66cd09608d 100644 --- a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/TopicContentsService.java +++ b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/TopicContentsService.java @@ -64,7 +64,8 @@ public Map readEvents( Map eventMap = new TreeMap<>(); KafkaConsumer consumer; - if (offsetPosition.equals(TopicContentType.RANGE.getValue()) && (rangeOffsetsStart < 0 || rangeOffsetsEnd < 0)) { + if (offsetPosition.equals(TopicContentType.RANGE.getValue()) + && (rangeOffsetsStart < 0 || rangeOffsetsEnd < 0)) { return eventMap; } @@ -95,7 +96,8 @@ public Map readEvents( } if (partitionsAssignment.isEmpty() - || (offsetPosition.equals(TopicContentType.RANGE.getValue()) && rangeOffsetsStart > rangeOffsetsEnd)) { + || (offsetPosition.equals(TopicContentType.RANGE.getValue()) + && rangeOffsetsStart > rangeOffsetsEnd)) { consumer.close(); return eventMap; } diff --git a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java index bca83ee877..590dfdcd27 100644 --- a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java +++ b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java @@ -168,8 +168,7 @@ void getTopicContentsWhenRangeAndEndOffsetNegative() throws Exception { @Test @Order(10) void getTopicContentsWhenRangeAndStartAndEndOffsetNegative() throws Exception { - String url = - createUrl(RANGE_SELECTION, Integer.MAX_VALUE, -1, -1); + String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, -1, -1); Map response = getTopicContentsPerformMockRequest(url); @@ -190,8 +189,7 @@ void getTopicContentsWhenRangeAndStartOffsetBiggerThanEndOffset() throws Excepti @Order(12) void getTopicContentsWhenRangeAndStartAndEndOffsetEqual() throws Exception { int start = 5; - String url = - createUrl(RANGE_SELECTION, Integer.MAX_VALUE, start, start); + String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, start, start); Map response = getTopicContentsPerformMockRequest(url); From d385be8ece2c067c7dacc19d867148ad197fc69c Mon Sep 17 00:00:00 2001 From: Khatib Tamal Date: Fri, 23 Aug 2024 22:50:05 -0400 Subject: [PATCH 6/6] code review changes and ClusterApiControllerIT improvement Signed-off-by: Khatib Tamal --- .../clusterapi/ClusterApiControllerIT.java | 91 ++++++++----------- .../clusterapi/TopicContentsControllerIT.java | 28 +++--- 2 files changed, 53 insertions(+), 66 deletions(-) diff --git a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/ClusterApiControllerIT.java b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/ClusterApiControllerIT.java index 74b32982c2..8cbdd7942c 100644 --- a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/ClusterApiControllerIT.java +++ b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/ClusterApiControllerIT.java @@ -73,8 +73,6 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.mock.web.MockHttpServletResponse; import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.DynamicPropertyRegistry; -import org.springframework.test.context.DynamicPropertySource; import org.springframework.test.context.TestPropertySource; import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; @@ -86,7 +84,24 @@ @TestPropertySource(locations = "classpath:application.properties") @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @DirtiesContext -@EmbeddedKafka(kraft = false) +@EmbeddedKafka( + brokerProperties = { + "authorizer.class.name=kafka.security.authorizer.AclAuthorizer", + "allow.everyone.if.no.acl.found=true", + "super.users=User:ANONYMOUS", + "ssl.truststore.location=src/test/resources/selfsignedcerts/truststore.jks", + "ssl.truststore.password=klaw1234", + "ssl.keystore.location=src/test/resources/selfsignedcerts/keystore.p12", + "ssl.key.password=klaw1234", + "ssl.keystore.password=klaw1234", + "ssl.keystore.type=pkcs12", + "listeners=PLAINTEXT://" + + ClusterApiControllerIT.BOOTSTRAP_SERVERS + + ",SSL://" + + ClusterApiControllerIT.BOOTSTRAP_SERVERS_SSL + }, + partitions = 1, + adminTimeout = 100) @Slf4j public class ClusterApiControllerIT { @@ -97,17 +112,16 @@ public class ClusterApiControllerIT { public static final String BEARER_PREFIX = "Bearer "; public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static final String TEST_MESSAGE = "A test message."; + public static final String BOOTSTRAP_SERVERS = "localhost:9092"; + public static final String BOOTSTRAP_SERVERS_SSL = "localhost:9093"; - static EmbeddedKafkaZKBroker embeddedKafkaBroker; + @Autowired private EmbeddedKafkaZKBroker embeddedKafkaBroker; @Value("${klaw.clusterapi.access.base64.secret}") private String clusterAccessSecret; private final UtilMethods utilMethods = new UtilMethods(); - private static final String bootStrapServers = "localhost:9092"; - private static final String bootStrapServersSsl = "localhost:9093"; - @Autowired private MockMvc mvc; ObjectMapper mapper = new ObjectMapper(); @@ -117,7 +131,7 @@ public class ClusterApiControllerIT { @Order(1) public void getKafkaServerStatus() throws Exception { String url = - "/topics/getStatus/" + bootStrapServers + "/PLAINTEXT/DEV1/kafka/kafkaFlavor/Apache Kafka"; + "/topics/getStatus/" + BOOTSTRAP_SERVERS + "/PLAINTEXT/DEV1/kafka/kafkaFlavor/Apache Kafka"; MockHttpServletResponse response = mvc.perform( MockMvcRequestBuilders.get(url) @@ -140,7 +154,7 @@ public void getKafkaServerStatus() throws Exception { @Order(2) public void getKafkaServerStatusSSL() throws Exception { String url = - "/topics/getStatus/" + bootStrapServersSsl + "/SSL/DEV2/kafka/kafkaFlavor/Apache Kafka"; + "/topics/getStatus/" + BOOTSTRAP_SERVERS_SSL + "/SSL/DEV2/kafka/kafkaFlavor/Apache Kafka"; MockHttpServletResponse response = mvc.perform( MockMvcRequestBuilders.get(url) @@ -255,7 +269,7 @@ public void createAclProducerIPAddress() throws Exception { ClusterAclRequest.builder() .clusterName("DEV2") .topicName(topicName) - .env(bootStrapServersSsl) + .env(BOOTSTRAP_SERVERS_SSL) .protocol(KafkaSupportedProtocol.SSL) .aclIp(ipHost) .aclNativeType(AclsNativeType.NATIVE.name()) @@ -298,7 +312,7 @@ public void createAclConsumerIPAddress() throws Exception { ClusterAclRequest.builder() .clusterName("DEV2") .topicName(topicName) - .env(bootStrapServersSsl) + .env(BOOTSTRAP_SERVERS_SSL) .protocol(KafkaSupportedProtocol.SSL) .aclIp(ipHost) .aclNativeType(AclsNativeType.NATIVE.name()) @@ -362,7 +376,7 @@ public void createAclProducerPrincipal() throws Exception { ClusterAclRequest.builder() .clusterName("DEV2") .topicName(topicName) - .env(bootStrapServersSsl) + .env(BOOTSTRAP_SERVERS_SSL) .protocol(KafkaSupportedProtocol.SSL) .aclSsl(principle) .aclNativeType(AclsNativeType.NATIVE.name()) @@ -407,7 +421,7 @@ public void createAclConsumerPrincipal() throws Exception { ClusterAclRequest.builder() .clusterName("DEV2") .topicName(TOPIC_NAME) - .env(bootStrapServersSsl) + .env(BOOTSTRAP_SERVERS_SSL) .protocol(KafkaSupportedProtocol.SSL) .aclSsl(principle) .aclNativeType(AclsNativeType.NATIVE.name()) @@ -474,7 +488,7 @@ public void createAclConsumerPrincipal() throws Exception { public void resetConsumerOffsetsToEarliest() throws Exception { produceAndConsumeRecords(true); // produce 10 records and consume all records - String url = "/topics/consumerGroupOffsets/reset/" + bootStrapServersSsl + "/SSL/" + "DEV2"; + String url = "/topics/consumerGroupOffsets/reset/" + BOOTSTRAP_SERVERS_SSL + "/SSL/" + "DEV2"; ResetConsumerGroupOffsetsRequest resetConsumerGroupOffsetsRequest = ResetConsumerGroupOffsetsRequest.builder() .offsetResetType(OffsetResetType.EARLIEST) @@ -545,7 +559,7 @@ public void resetConsumerOffsetsToEarliest() throws Exception { public void resetConsumerOffsetsToLatest() throws Exception { produceAndConsumeRecords(true); // produce 10 more records - String url = "/topics/consumerGroupOffsets/reset/" + bootStrapServersSsl + "/SSL/" + "DEV2"; + String url = "/topics/consumerGroupOffsets/reset/" + BOOTSTRAP_SERVERS_SSL + "/SSL/" + "DEV2"; ResetConsumerGroupOffsetsRequest resetConsumerGroupOffsetsRequest = ResetConsumerGroupOffsetsRequest.builder() .offsetResetType(OffsetResetType.LATEST) @@ -616,7 +630,7 @@ public void resetConsumerOffsetsToLatest() throws Exception { public void resetConsumerOffsetsToLatestDontConsumeRecs() throws Exception { produceAndConsumeRecords(false); // produce 10 more records - String url = "/topics/consumerGroupOffsets/reset/" + bootStrapServersSsl + "/SSL/" + "DEV2"; + String url = "/topics/consumerGroupOffsets/reset/" + BOOTSTRAP_SERVERS_SSL + "/SSL/" + "DEV2"; ResetConsumerGroupOffsetsRequest resetConsumerGroupOffsetsRequest = ResetConsumerGroupOffsetsRequest.builder() .offsetResetType(OffsetResetType.LATEST) @@ -727,7 +741,7 @@ public void deleteTopics() throws Exception { @Test @Order(15) public void resetOffsetsNonExistingTopic() throws Exception { - String url = "/topics/consumerGroupOffsets/reset/" + bootStrapServersSsl + "/SSL/" + "DEV2"; + String url = "/topics/consumerGroupOffsets/reset/" + BOOTSTRAP_SERVERS_SSL + "/SSL/" + "DEV2"; String nonExistingTopic = "topicdoesnotexist"; ResetConsumerGroupOffsetsRequest resetConsumerGroupOffsetsRequest = ResetConsumerGroupOffsetsRequest.builder() @@ -765,7 +779,7 @@ public void getTopicContents() throws Exception { int numberOfOffsetsToRead = 2; String url = "/topics/getTopicContents/" - + bootStrapServersSsl + + BOOTSTRAP_SERVERS_SSL + "/" + "SSL/undefined/testtopic/custom/partitionId/0/" + "selectedNumberOfOffsets/" @@ -796,7 +810,7 @@ public void getTopicContentsInvalidPartitionId() throws Exception { int partitionId = 5; String url = "/topics/getTopicContents/" - + bootStrapServersSsl + + BOOTSTRAP_SERVERS_SSL + "/" + "SSL/undefined/testtopic/custom/partitionId/" + partitionId @@ -826,7 +840,7 @@ private void produceAndConsumeRecords(boolean consumeRecs) String stringSerializer = "org.apache.kafka.common.serialization.StringSerializer"; String stringDeserializer = "org.apache.kafka.common.serialization.StringDeserializer"; - configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); + configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, stringSerializer); configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, stringSerializer); Producer producer = new KafkaProducer<>(configProperties); @@ -840,7 +854,7 @@ private void produceAndConsumeRecords(boolean consumeRecs) if (consumeRecs) { Properties consumerConfigProperties = new Properties(); - consumerConfigProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); + consumerConfigProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); consumerConfigProperties.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, stringDeserializer); consumerConfigProperties.put( @@ -883,7 +897,7 @@ private static ClusterTopicRequest createTopicRequest(String topicName) { return ClusterTopicRequest.builder() .clusterName("DEV2") .topicName(topicName) - .env(bootStrapServersSsl) + .env(BOOTSTRAP_SERVERS_SSL) .protocol(KafkaSupportedProtocol.SSL) .partitions(1) .replicationFactor(Short.parseShort("1")) @@ -896,42 +910,11 @@ private static ClusterTopicRequest updateTopicRequest(String topicName) { return ClusterTopicRequest.builder() .clusterName("DEV2") .topicName(topicName) - .env(bootStrapServersSsl) + .env(BOOTSTRAP_SERVERS_SSL) .protocol(KafkaSupportedProtocol.SSL) .partitions(1) .replicationFactor(Short.parseShort("1")) .advancedTopicConfiguration(advancedConfig) .build(); } - - private static Map buildBrokerProperties() { - Map brokerProperties = new HashMap<>(); - - brokerProperties.put("authorizer.class.name", "kafka.security.authorizer.AclAuthorizer"); - brokerProperties.put("allow.everyone.if.no.acl.found", "true"); - brokerProperties.put("super.users", "User:ANONYMOUS"); - brokerProperties.put( - "ssl.truststore.location", "src/test/resources/selfsignedcerts/truststore.jks"); - brokerProperties.put("ssl.truststore.password", "klaw1234"); - brokerProperties.put( - "ssl.keystore.location", "src/test/resources/selfsignedcerts/keystore.p12"); - brokerProperties.put("ssl.key.password", "klaw1234"); - brokerProperties.put("ssl.keystore.password", "klaw1234"); - brokerProperties.put("ssl.keystore.type", "pkcs12"); - brokerProperties.put( - "listeners", "PLAINTEXT://" + bootStrapServers + ",SSL://" + bootStrapServersSsl); - - return brokerProperties; - } - - @DynamicPropertySource - static void registerKafkaProperties(DynamicPropertyRegistry registry) { - embeddedKafkaBroker = new EmbeddedKafkaZKBroker(1, false, 1); - - for (Map.Entry stringStringEntry : buildBrokerProperties().entrySet()) { - embeddedKafkaBroker.brokerProperty(stringStringEntry.getKey(), stringStringEntry.getValue()); - } - embeddedKafkaBroker.setAdminTimeout(100); - embeddedKafkaBroker.afterPropertiesSet(); - } } diff --git a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java index 590dfdcd27..9aded7fddd 100644 --- a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java +++ b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/TopicContentsControllerIT.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.type.MapType; import com.fasterxml.jackson.databind.type.TypeFactory; +import io.aiven.klaw.clusterapi.models.enums.TopicContentType; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -42,8 +43,6 @@ topics = {TopicContentsControllerIT.TEST_TOPIC_NAME}) public class TopicContentsControllerIT { - public static final String CUSTOM_SELECTION = "custom"; - public static final String RANGE_SELECTION = "range"; public static final String BOOTSTRAP_SERVER = "localhost:9092"; public static final String TEST_TOPIC_NAME = "test-topic"; public static final int TOTAL_TEST_RECORDS = 10; @@ -103,7 +102,9 @@ void getTopicContentsWhenSelectedNumberOfOffsetsIsNegative() throws Exception { @Order(4) void getTopicContentsWhenCustomAndOffsetsLessThanTotalRecords() throws Exception { int totalOffsets = 5; - String url = createUrl(CUSTOM_SELECTION, totalOffsets, Integer.MAX_VALUE, Integer.MAX_VALUE); + String url = + createUrl( + TopicContentType.CUSTOM.getValue(), totalOffsets, Integer.MAX_VALUE, Integer.MAX_VALUE); Map response = getTopicContentsPerformMockRequest(url); @@ -114,7 +115,9 @@ void getTopicContentsWhenCustomAndOffsetsLessThanTotalRecords() throws Exception @Order(5) void getTopicContentsWhenCustomAndOffsetsMoreThanTotalRecords() throws Exception { int totalOffsets = TOTAL_TEST_RECORDS + 10; - String url = createUrl(CUSTOM_SELECTION, totalOffsets, Integer.MAX_VALUE, Integer.MAX_VALUE); + String url = + createUrl( + TopicContentType.CUSTOM.getValue(), totalOffsets, Integer.MAX_VALUE, Integer.MAX_VALUE); Map response = getTopicContentsPerformMockRequest(url); @@ -124,7 +127,8 @@ void getTopicContentsWhenCustomAndOffsetsMoreThanTotalRecords() throws Exception @Test @Order(6) void getTopicContentsWhenCustomAndOffsetAndOffsetIsNegative() throws Exception { - String url = createUrl(CUSTOM_SELECTION, -1, Integer.MAX_VALUE, Integer.MAX_VALUE); + String url = + createUrl(TopicContentType.CUSTOM.getValue(), -1, Integer.MAX_VALUE, Integer.MAX_VALUE); Map response = getTopicContentsPerformMockRequest(url); @@ -136,7 +140,7 @@ void getTopicContentsWhenCustomAndOffsetAndOffsetIsNegative() throws Exception { void getTopicContentsWhenRangeAndValidBounds() throws Exception { int start = 4; int end = 7; - String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, start, end); + String url = createUrl(TopicContentType.RANGE.getValue(), Integer.MAX_VALUE, start, end); Map response = getTopicContentsPerformMockRequest(url); @@ -147,7 +151,7 @@ void getTopicContentsWhenRangeAndValidBounds() throws Exception { @Order(8) void getTopicContentsWhenRangeAndStartOffsetNegative() throws Exception { int end = 7; - String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, -1, end); + String url = createUrl(TopicContentType.RANGE.getValue(), Integer.MAX_VALUE, -1, end); Map response = getTopicContentsPerformMockRequest(url); @@ -158,7 +162,7 @@ void getTopicContentsWhenRangeAndStartOffsetNegative() throws Exception { @Order(9) void getTopicContentsWhenRangeAndEndOffsetNegative() throws Exception { int start = 4; - String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, start, -1); + String url = createUrl(TopicContentType.RANGE.getValue(), Integer.MAX_VALUE, start, -1); Map response = getTopicContentsPerformMockRequest(url); @@ -168,7 +172,7 @@ void getTopicContentsWhenRangeAndEndOffsetNegative() throws Exception { @Test @Order(10) void getTopicContentsWhenRangeAndStartAndEndOffsetNegative() throws Exception { - String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, -1, -1); + String url = createUrl(TopicContentType.RANGE.getValue(), Integer.MAX_VALUE, -1, -1); Map response = getTopicContentsPerformMockRequest(url); @@ -178,7 +182,7 @@ void getTopicContentsWhenRangeAndStartAndEndOffsetNegative() throws Exception { @Test @Order(11) void getTopicContentsWhenRangeAndStartOffsetBiggerThanEndOffset() throws Exception { - String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, 6, 3); + String url = createUrl(TopicContentType.RANGE.getValue(), Integer.MAX_VALUE, 6, 3); Map response = getTopicContentsPerformMockRequest(url); @@ -189,7 +193,7 @@ void getTopicContentsWhenRangeAndStartOffsetBiggerThanEndOffset() throws Excepti @Order(12) void getTopicContentsWhenRangeAndStartAndEndOffsetEqual() throws Exception { int start = 5; - String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, start, start); + String url = createUrl(TopicContentType.RANGE.getValue(), Integer.MAX_VALUE, start, start); Map response = getTopicContentsPerformMockRequest(url); @@ -200,7 +204,7 @@ void getTopicContentsWhenRangeAndStartAndEndOffsetEqual() throws Exception { @Order(13) void getTopicContentsWhenRangeAndTotalOffsetsLargerThanMax() throws Exception { produceRecords(200); - String url = createUrl(RANGE_SELECTION, Integer.MAX_VALUE, 20, 180); + String url = createUrl(TopicContentType.RANGE.getValue(), Integer.MAX_VALUE, 20, 180); Map response = getTopicContentsPerformMockRequest(url);