diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index f23036c1e4bf3..e54c717102730 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -235,14 +235,13 @@ public void testGenerateClientId() { KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG, "myCustomId"))); } - private static Cluster mockCluster(int controllerIndex) { + private static Cluster mockCluster(int numNodes, int controllerIndex) { HashMap nodes = new HashMap<>(); - nodes.put(0, new Node(0, "localhost", 8121)); - nodes.put(1, new Node(1, "localhost", 8122)); - nodes.put(2, new Node(2, "localhost", 8123)); + for (int i = 0; i < numNodes; i++) + nodes.put(i, new Node(i, "localhost", 8121 + i)); return new Cluster("mockClusterId", nodes.values(), - Collections.emptySet(), Collections.emptySet(), - Collections.emptySet(), nodes.get(controllerIndex)); + Collections.emptySet(), Collections.emptySet(), + Collections.emptySet(), nodes.get(controllerIndex)); } private static Cluster mockBootstrapCluster() { @@ -251,7 +250,7 @@ private static Cluster mockBootstrapCluster() { } private static AdminClientUnitTestEnv mockClientEnv(String... configVals) { - return new AdminClientUnitTestEnv(mockCluster(0), configVals); + return new AdminClientUnitTestEnv(mockCluster(3, 0), configVals); } @Test @@ -357,7 +356,7 @@ public void testConnectionFailureOnMetadataUpdate() throws Exception { Cluster cluster = mockBootstrapCluster(); try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster)) { - Cluster discoveredCluster = mockCluster(0); + Cluster discoveredCluster = mockCluster(3, 0); env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(request -> request instanceof MetadataRequest, null, true); env.kafkaClient().prepareResponse(request -> request instanceof MetadataRequest, @@ -383,7 +382,7 @@ public void testUnreachableBootstrapServer() throws Exception { Map unreachableNodes = Collections.singletonMap(cluster.nodes().get(0), 200L); try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster, AdminClientUnitTestEnv.clientConfigs(), unreachableNodes)) { - Cluster discoveredCluster = mockCluster(0); + Cluster discoveredCluster = mockCluster(3, 0); env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest, MetadataResponse.prepareResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(), @@ -404,12 +403,12 @@ public void testUnreachableBootstrapServer() throws Exception { */ @Test public void testPropagatedMetadataFetchException() throws Exception { - Cluster cluster = mockCluster(0); - try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster, + try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, + mockCluster(3, 0), newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121", - AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) { + AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().createPendingAuthenticationError(cluster.nodeById(0), + env.kafkaClient().createPendingAuthenticationError(env.cluster().nodeById(0), TimeUnit.DAYS.toMillis(1)); env.kafkaClient().prepareResponse(prepareCreateTopicsResponse("myTopic", Errors.NONE)); KafkaFuture future = env.adminClient().createTopics( @@ -434,11 +433,11 @@ public void testCreateTopics() throws Exception { @Test public void testCreateTopicsRetryBackoff() throws Exception { - Cluster cluster = mockCluster(0); MockTime time = new MockTime(); int retryBackoff = 100; - try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, + try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, + mockCluster(3, 0), newStrMap(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "" + retryBackoff))) { MockClient mockClient = env.kafkaClient(); @@ -563,7 +562,7 @@ public void testMetadataRetries() throws Exception { String topic = "topic"; Cluster bootstrapCluster = Cluster.bootstrap(singletonList(new InetSocketAddress("localhost", 9999))); - Cluster initializedCluster = mockCluster(0); + Cluster initializedCluster = mockCluster(3, 0); try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, bootstrapCluster, newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999", @@ -830,14 +829,10 @@ public void testElectLeaders() throws Exception { @Ignore // The test is flaky. Should be renabled when this JIRA is fixed: https://issues.apache.org/jira/browse/KAFKA-5792 @Test public void testHandleTimeout() throws Exception { - HashMap nodes = new HashMap<>(); MockTime time = new MockTime(); - nodes.put(0, new Node(0, "localhost", 8121)); - Cluster cluster = new Cluster("mockClusterId", nodes.values(), - Collections.emptySet(), Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, - AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1", + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, + mockCluster(1, 0), + AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1", AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, "1")) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); assertEquals(time, env.time()); @@ -948,11 +943,10 @@ public void testDeleteRecordsMultipleSends() throws Exception { TopicPartition tp0 = new TopicPartition(topic, 0); TopicPartition tp1 = new TopicPartition(topic, 1); - Cluster cluster = mockCluster(0); MockTime time = new MockTime(); - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster)) { - List nodes = cluster.nodes(); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, mockCluster(3, 0))) { + List nodes = env.cluster().nodes(); List partitionMetadata = new ArrayList<>(); partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp0.partition(), nodes.get(0), @@ -964,8 +958,8 @@ public void testDeleteRecordsMultipleSends() throws Exception { List topicMetadata = new ArrayList<>(); topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, topic, false, partitionMetadata)); - env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(cluster.nodes(), - cluster.clusterResource().clusterId(), cluster.controller().id(), topicMetadata)); + env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), env.cluster().controller().id(), topicMetadata)); Map deletedPartitions = new HashMap<>(); deletedPartitions.put(tp0, new DeleteRecordsResponse.PartitionResponse(3, Errors.NONE)); @@ -986,7 +980,6 @@ public void testDeleteRecordsMultipleSends() throws Exception { @Test public void testDeleteRecords() throws Exception { - HashMap nodes = new HashMap<>(); nodes.put(0, new Node(0, "localhost", 8121)); List partitionInfos = new ArrayList<>(); @@ -1092,29 +1085,13 @@ public void testDeleteRecords() throws Exception { @Test public void testDescribeCluster() throws Exception { - final HashMap nodes = new HashMap<>(); - Node node0 = new Node(0, "localhost", 8121); - Node node1 = new Node(1, "localhost", 8122); - Node node2 = new Node(2, "localhost", 8123); - Node node3 = new Node(3, "localhost", 8124); - nodes.put(0, node0); - nodes.put(1, node1); - nodes.put(2, node2); - nodes.put(3, node3); - - final Cluster cluster = new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0), + AdminClientConfig.RETRIES_CONFIG, "2")) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); // Prepare the metadata response used for the first describe cluster MetadataResponse response = MetadataResponse.prepareResponse(0, - new ArrayList<>(nodes.values()), + env.cluster().nodes(), env.cluster().clusterResource().clusterId(), 2, Collections.emptyList(), @@ -1123,7 +1100,7 @@ public void testDescribeCluster() throws Exception { // Prepare the metadata response used for the second describe cluster MetadataResponse response2 = MetadataResponse.prepareResponse(0, - new ArrayList<>(nodes.values()), + env.cluster().nodes(), env.cluster().clusterResource().clusterId(), 3, Collections.emptyList(), @@ -1147,24 +1124,8 @@ public void testDescribeCluster() throws Exception { @Test public void testListConsumerGroups() throws Exception { - final HashMap nodes = new HashMap<>(); - Node node0 = new Node(0, "localhost", 8121); - Node node1 = new Node(1, "localhost", 8122); - Node node2 = new Node(2, "localhost", 8123); - Node node3 = new Node(3, "localhost", 8124); - nodes.put(0, node0); - nodes.put(1, node1); - nodes.put(2, node2); - nodes.put(3, node3); - - final Cluster cluster = new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0), + AdminClientConfig.RETRIES_CONFIG, "2")) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); // Empty metadata response should be retried @@ -1194,7 +1155,7 @@ public void testListConsumerGroups() throws Exception { .setGroupId("group-connect-1") .setProtocolType("connector") ))), - node0); + env.cluster().nodeById(0)); // handle retriable errors env.kafkaClient().prepareResponseFrom( @@ -1203,14 +1164,14 @@ public void testListConsumerGroups() throws Exception { .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) .setGroups(Collections.emptyList()) ), - node1); + env.cluster().nodeById(1)); env.kafkaClient().prepareResponseFrom( new ListGroupsResponse( new ListGroupsResponseData() .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) .setGroups(Collections.emptyList()) ), - node1); + env.cluster().nodeById(1)); env.kafkaClient().prepareResponseFrom( new ListGroupsResponse( new ListGroupsResponseData() @@ -1223,7 +1184,7 @@ public void testListConsumerGroups() throws Exception { .setGroupId("group-connect-2") .setProtocolType("connector") ))), - node1); + env.cluster().nodeById(1)); env.kafkaClient().prepareResponseFrom( new ListGroupsResponse( @@ -1237,7 +1198,7 @@ public void testListConsumerGroups() throws Exception { .setGroupId("group-connect-3") .setProtocolType("connector") ))), - node2); + env.cluster().nodeById(2)); // fatal error env.kafkaClient().prepareResponseFrom( @@ -1245,7 +1206,7 @@ public void testListConsumerGroups() throws Exception { new ListGroupsResponseData() .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) .setGroups(Collections.emptyList())), - node3); + env.cluster().nodeById(3)); final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(); TestUtils.assertFutureError(result.all(), UnknownServerException.class); @@ -1265,20 +1226,7 @@ public void testListConsumerGroups() throws Exception { @Test public void testListConsumerGroupsMetadataFailure() throws Exception { - final HashMap nodes = new HashMap<>(); - Node node0 = new Node(0, "localhost", 8121); - Node node1 = new Node(1, "localhost", 8122); - Node node2 = new Node(2, "localhost", 8123); - nodes.put(0, node0); - nodes.put(1, node1); - nodes.put(2, node2); - - final Cluster cluster = new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); + final Cluster cluster = mockCluster(3, 0); final Time time = new MockTime(); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, @@ -1301,18 +1249,7 @@ public void testListConsumerGroupsMetadataFailure() throws Exception { @Test public void testDescribeConsumerGroups() throws Exception { - final HashMap nodes = new HashMap<>(); - nodes.put(0, new Node(0, "localhost", 8121)); - - final Cluster cluster = - new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); //Retriable FindCoordinatorResponse errors should be retried @@ -1407,18 +1344,7 @@ public void testDescribeConsumerGroups() throws Exception { @Test public void testDescribeMultipleConsumerGroups() throws Exception { - final HashMap nodes = new HashMap<>(); - nodes.put(0, new Node(0, "localhost", 8121)); - - final Cluster cluster = - new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); @@ -1476,18 +1402,7 @@ public void testDescribeMultipleConsumerGroups() throws Exception { @Test public void testDescribeConsumerGroupsWithAuthorizedOperationsOmitted() throws Exception { - final HashMap nodes = new HashMap<>(); - nodes.put(0, new Node(0, "localhost", 8121)); - - final Cluster cluster = - new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse( @@ -1514,18 +1429,7 @@ public void testDescribeConsumerGroupsWithAuthorizedOperationsOmitted() throws E @Test public void testDescribeNonConsumerGroups() throws Exception { - final HashMap nodes = new HashMap<>(); - nodes.put(0, new Node(0, "localhost", 8121)); - - final Cluster cluster = - new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); @@ -1551,18 +1455,7 @@ public void testDescribeNonConsumerGroups() throws Exception { @Test public void testDescribeConsumerGroupOffsets() throws Exception { - final HashMap nodes = new HashMap<>(); - nodes.put(0, new Node(0, "localhost", 8121)); - - final Cluster cluster = - new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); // Retriable FindCoordinatorResponse errors should be retried @@ -1607,20 +1500,9 @@ public void testDescribeConsumerGroupOffsets() throws Exception { @Test public void testDeleteConsumerGroups() throws Exception { - final Map nodes = new HashMap<>(); - nodes.put(0, new Node(0, "localhost", 8121)); - - final Cluster cluster = - new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - final List groupIds = singletonList("group-0"); - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); //Retriable FindCoordinatorResponse errors should be retried @@ -1702,23 +1584,12 @@ public void testDeleteConsumerGroups() throws Exception { public void testDeleteConsumerGroupOffsets() throws Exception { // Happy path - final Map nodes = new HashMap<>(); - nodes.put(0, new Node(0, "localhost", 8121)); - - final Cluster cluster = - new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - final String groupId = "group-0"; final TopicPartition tp1 = new TopicPartition("foo", 0); final TopicPartition tp2 = new TopicPartition("bar", 0); final TopicPartition tp3 = new TopicPartition("foobar", 0); - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse( @@ -1759,21 +1630,10 @@ public void testDeleteConsumerGroupOffsets() throws Exception { public void testDeleteConsumerGroupOffsetsRetriableErrors() throws Exception { // Retriable errors should be retried - final Map nodes = new HashMap<>(); - nodes.put(0, new Node(0, "localhost", 8121)); - - final Cluster cluster = - new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - final String groupId = "group-0"; final TopicPartition tp1 = new TopicPartition("foo", 0); - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse( @@ -1811,23 +1671,12 @@ public void testDeleteConsumerGroupOffsetsRetriableErrors() throws Exception { public void testDeleteConsumerGroupOffsetsNonRetriableErrors() throws Exception { // Non-retriable errors throw an exception - final Map nodes = new HashMap<>(); - nodes.put(0, new Node(0, "localhost", 8121)); - - final Cluster cluster = - new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - final String groupId = "group-0"; final TopicPartition tp1 = new TopicPartition("foo", 0); final List nonRetriableErrors = Arrays.asList( Errors.GROUP_AUTHORIZATION_FAILED, Errors.INVALID_GROUP_ID, Errors.GROUP_ID_NOT_FOUND); - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); for (Errors error : nonRetriableErrors) { @@ -1850,21 +1699,10 @@ public void testDeleteConsumerGroupOffsetsNonRetriableErrors() throws Exception public void testDeleteConsumerGroupOffsetsFindCoordinatorRetriableErrors() throws Exception { // Retriable FindCoordinatorResponse errors should be retried - final Map nodes = new HashMap<>(); - nodes.put(0, new Node(0, "localhost", 8121)); - - final Cluster cluster = - new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - final String groupId = "group-0"; final TopicPartition tp1 = new TopicPartition("foo", 0); - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse( @@ -1890,21 +1728,10 @@ public void testDeleteConsumerGroupOffsetsFindCoordinatorRetriableErrors() throw public void testDeleteConsumerGroupOffsetsFindCoordinatorNonRetriableErrors() throws Exception { // Non-retriable FindCoordinatorResponse errors throw an exception - final Map nodes = new HashMap<>(); - nodes.put(0, new Node(0, "localhost", 8121)); - - final Cluster cluster = - new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - final String groupId = "group-0"; final TopicPartition tp1 = new TopicPartition("foo", 0); - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse( @@ -2278,23 +2105,12 @@ public void testListPartitionReassignments() throws Exception { public void testAlterConsumerGroupOffsets() throws Exception { // Happy path - final Map nodes = new HashMap<>(); - nodes.put(0, new Node(0, "localhost", 8121)); - - final Cluster cluster = - new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - final String groupId = "group-0"; final TopicPartition tp1 = new TopicPartition("foo", 0); final TopicPartition tp2 = new TopicPartition("bar", 0); final TopicPartition tp3 = new TopicPartition("foobar", 0); - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse( @@ -2322,21 +2138,10 @@ public void testAlterConsumerGroupOffsets() throws Exception { public void testAlterConsumerGroupOffsetsRetriableErrors() throws Exception { // Retriable errors should be retried - final Map nodes = new HashMap<>(); - nodes.put(0, new Node(0, "localhost", 8121)); - - final Cluster cluster = - new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - final String groupId = "group-0"; final TopicPartition tp1 = new TopicPartition("foo", 0); - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse( @@ -2377,23 +2182,12 @@ public void testAlterConsumerGroupOffsetsRetriableErrors() throws Exception { public void testAlterConsumerGroupOffsetsNonRetriableErrors() throws Exception { // Non-retriable errors throw an exception - final Map nodes = new HashMap<>(); - nodes.put(0, new Node(0, "localhost", 8121)); - - final Cluster cluster = - new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - final String groupId = "group-0"; final TopicPartition tp1 = new TopicPartition("foo", 0); final List nonRetriableErrors = Arrays.asList( Errors.GROUP_AUTHORIZATION_FAILED, Errors.INVALID_GROUP_ID, Errors.GROUP_ID_NOT_FOUND); - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); for (Errors error : nonRetriableErrors) { @@ -2417,21 +2211,10 @@ public void testAlterConsumerGroupOffsetsNonRetriableErrors() throws Exception { public void testAlterConsumerGroupOffsetsFindCoordinatorRetriableErrors() throws Exception { // Retriable FindCoordinatorResponse errors should be retried - final Map nodes = new HashMap<>(); - nodes.put(0, new Node(0, "localhost", 8121)); - - final Cluster cluster = - new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - final String groupId = "group-0"; final TopicPartition tp1 = new TopicPartition("foo", 0); - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse( @@ -2459,21 +2242,10 @@ public void testAlterConsumerGroupOffsetsFindCoordinatorRetriableErrors() throws public void testAlterConsumerGroupOffsetsFindCoordinatorNonRetriableErrors() throws Exception { // Non-retriable FindCoordinatorResponse errors throw an exception - final Map nodes = new HashMap<>(); - nodes.put(0, new Node(0, "localhost", 8121)); - - final Cluster cluster = - new Cluster( - "mockClusterId", - nodes.values(), - Collections.emptyList(), - Collections.emptySet(), - Collections.emptySet(), nodes.get(0)); - final String groupId = "group-0"; final TopicPartition tp1 = new TopicPartition("foo", 0); - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(