diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApi.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApi.java index 978e6c75644..91639c9e32a 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApi.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApi.java @@ -20,11 +20,6 @@ public interface CruiseControlApi { */ String CC_REST_API_PROGRESS_KEY = "progress"; - /** - * User ID header key - */ - String CC_REST_API_USER_ID_HEADER = "User-Task-ID"; - /** * Gets the state of the Cruise Control server. * diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApiImpl.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApiImpl.java index c2b146734bb..729dd32d4f4 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApiImpl.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApiImpl.java @@ -30,6 +30,8 @@ import java.net.NoRouteToHostException; import java.util.concurrent.TimeoutException; +import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlHeaders.USER_TASK_ID_HEADER; + /** * Implementation of the Cruise Control API client */ @@ -121,7 +123,7 @@ private Future getCruiseControlState(String host, int por request.result().send(response -> { if (response.succeeded()) { if (response.result().statusCode() == 200 || response.result().statusCode() == 201) { - String userTaskID = response.result().getHeader(CC_REST_API_USER_ID_HEADER); + String userTaskID = response.result().getHeader(USER_TASK_ID_HEADER.toString()); response.result().bodyHandler(buffer -> { JsonObject json = buffer.toJsonObject(); if (json.containsKey(CC_REST_API_ERROR_KEY)) { @@ -151,7 +153,7 @@ private Future getCruiseControlState(String host, int por } if (userTaskId != null) { - request.result().putHeader(CC_REST_API_USER_ID_HEADER, userTaskId); + request.result().putHeader(USER_TASK_ID_HEADER.toString(), userTaskId); } }); }); @@ -165,7 +167,7 @@ private void internalRebalance(String host, int port, String path, String userTa } if (userTaskId != null) { - request.result().putHeader(CC_REST_API_USER_ID_HEADER, userTaskId); + request.result().putHeader(USER_TASK_ID_HEADER.toString(), userTaskId); } if (authHttpHeader != null) { @@ -176,14 +178,14 @@ private void internalRebalance(String host, int port, String path, String userTa if (response.succeeded()) { if (response.result().statusCode() == 200 || response.result().statusCode() == 201) { response.result().bodyHandler(buffer -> { - String userTaskID = response.result().getHeader(CC_REST_API_USER_ID_HEADER); + String userTaskID = response.result().getHeader(USER_TASK_ID_HEADER.toString()); JsonObject json = buffer.toJsonObject(); CruiseControlRebalanceResponse ccResponse = new CruiseControlRebalanceResponse(userTaskID, json); result.complete(ccResponse); }); } else if (response.result().statusCode() == 202) { response.result().bodyHandler(buffer -> { - String userTaskID = response.result().getHeader(CC_REST_API_USER_ID_HEADER); + String userTaskID = response.result().getHeader(USER_TASK_ID_HEADER.toString()); JsonObject json = buffer.toJsonObject(); CruiseControlRebalanceResponse ccResponse = new CruiseControlRebalanceResponse(userTaskID, json); if (json.containsKey(CC_REST_API_PROGRESS_KEY)) { @@ -199,7 +201,7 @@ private void internalRebalance(String host, int port, String path, String userTa }); } else if (response.result().statusCode() == 500) { response.result().bodyHandler(buffer -> { - String userTaskID = response.result().getHeader(CC_REST_API_USER_ID_HEADER); + String userTaskID = response.result().getHeader(USER_TASK_ID_HEADER.toString()); JsonObject json = buffer.toJsonObject(); if (json.containsKey(CC_REST_API_ERROR_KEY)) { // If there was a client side error, check whether it was due to not enough data being available ... @@ -321,7 +323,7 @@ public Future getUserTaskStatus(String host, int port, St request.result().send(response -> { if (response.succeeded()) { if (response.result().statusCode() == 200 || response.result().statusCode() == 201) { - String userTaskID = response.result().getHeader(CC_REST_API_USER_ID_HEADER); + String userTaskID = response.result().getHeader(USER_TASK_ID_HEADER.toString()); response.result().bodyHandler(buffer -> { JsonObject json = buffer.toJsonObject(); JsonObject jsonUserTask = json.getJsonArray("userTasks").getJsonObject(0); @@ -418,7 +420,7 @@ public Future stopExecution(String host, int port) { request.result().send(response -> { if (response.succeeded()) { if (response.result().statusCode() == 200 || response.result().statusCode() == 201) { - String userTaskID = response.result().getHeader(CC_REST_API_USER_ID_HEADER); + String userTaskID = response.result().getHeader(USER_TASK_ID_HEADER.toString()); response.result().bodyHandler(buffer -> { JsonObject json = buffer.toJsonObject(); if (json.containsKey(CC_REST_API_ERROR_KEY)) { diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/MockCruiseControl.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/MockCruiseControl.java index 914239d9b99..e224e487d97 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/MockCruiseControl.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/MockCruiseControl.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.LogManager; +import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlHeaders.USER_TASK_ID_HEADER; import static java.nio.charset.StandardCharsets.UTF_8; import static org.mockserver.model.Header.header; import static org.mockserver.model.HttpRequest.request; @@ -134,7 +135,7 @@ public void setupCCStateResponse() { .withQueryStringParameter(Parameter.param(CruiseControlParameters.JSON.toString(), "true|false")) .withQueryStringParameter(Parameter.param(CruiseControlParameters.VERBOSE.toString(), "true|false")) .withPath(CruiseControlEndpoints.STATE.toString()) - .withHeaders(header(CruiseControlApi.CC_REST_API_USER_ID_HEADER, STATE_PROPOSAL_NOT_READY), + .withHeaders(header(USER_TASK_ID_HEADER.toString(), STATE_PROPOSAL_NOT_READY), AUTH_HEADER) .withSecure(true)) .respond( @@ -204,7 +205,7 @@ public void setupCCRebalanceNotEnoughDataError(CruiseControlEndpoints endpoint) response() .withStatusCode(500) .withBody(jsonError) - .withHeaders(header(CruiseControlApi.CC_REST_API_USER_ID_HEADER, REBALANCE_NOT_ENOUGH_VALID_WINDOWS_ERROR_RESPONSE_UTID)) + .withHeaders(header(USER_TASK_ID_HEADER.toString(), REBALANCE_NOT_ENOUGH_VALID_WINDOWS_ERROR_RESPONSE_UTID)) .withDelay(TimeUnit.SECONDS, 0)); } @@ -231,7 +232,7 @@ public void setupCCBrokerDoesNotExist(CruiseControlEndpoints endpoint) { response() .withStatusCode(500) .withBody(jsonError) - .withHeaders(header(CruiseControlApi.CC_REST_API_USER_ID_HEADER, BROKERS_NOT_EXIST_ERROR_RESPONSE_UTID)) + .withHeaders(header(USER_TASK_ID_HEADER.toString(), BROKERS_NOT_EXIST_ERROR_RESPONSE_UTID)) .withDelay(TimeUnit.SECONDS, 0)); } diff --git a/operator-common/src/main/java/io/strimzi/operator/common/model/cruisecontrol/CruiseControlHeaders.java b/operator-common/src/main/java/io/strimzi/operator/common/model/cruisecontrol/CruiseControlHeaders.java new file mode 100644 index 00000000000..21f2d35fa0f --- /dev/null +++ b/operator-common/src/main/java/io/strimzi/operator/common/model/cruisecontrol/CruiseControlHeaders.java @@ -0,0 +1,31 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.common.model.cruisecontrol; + +/** + * Enum with Cruise Control headers + */ +public enum CruiseControlHeaders { + /** + * User task id + */ + USER_TASK_ID_HEADER("User-Task-ID"); + + private final String name; + + /** + * Creates the Enum from String + * + * @param name String with the path + */ + CruiseControlHeaders(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } +} diff --git a/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaTopicUtils.java b/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaTopicUtils.java index 5731eee868b..21544ebc1f2 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaTopicUtils.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaTopicUtils.java @@ -347,7 +347,6 @@ private static boolean checkReplicaChangeFailureDueToInsufficientBrokers(KafkaTo if (kafkaTopic != null && kafkaTopic.getStatus() != null && kafkaTopic.getStatus().getReplicasChange() != null) { String message = kafkaTopic.getStatus().getReplicasChange().getMessage(); return message != null && - message.contains("Replicas change failed (500), Error processing POST request") && message.contains("Requested RF cannot be more than number of alive brokers") && kafkaTopic.getStatus().getReplicasChange().getState().toValue().equals("pending") && kafkaTopic.getStatus().getReplicasChange().getTargetReplicas() == targetReplicas; diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/ReplicasChangeHandler.java b/topic-operator/src/main/java/io/strimzi/operator/topic/ReplicasChangeHandler.java index b0be4b03c1a..5e8135c08f4 100644 --- a/topic-operator/src/main/java/io/strimzi/operator/topic/ReplicasChangeHandler.java +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/ReplicasChangeHandler.java @@ -4,84 +4,73 @@ */ package io.strimzi.operator.topic; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.strimzi.api.kafka.model.topic.KafkaTopic; import io.strimzi.api.kafka.model.topic.KafkaTopicStatusBuilder; import io.strimzi.api.kafka.model.topic.ReplicasChangeStatusBuilder; import io.strimzi.operator.common.Reconciliation; import io.strimzi.operator.common.ReconciliationLogger; -import io.strimzi.operator.common.model.cruisecontrol.CruiseControlEndpoints; -import io.strimzi.operator.common.model.cruisecontrol.CruiseControlParameters; +import io.strimzi.operator.topic.cruisecontrol.CruiseControlClient; +import io.strimzi.operator.topic.cruisecontrol.CruiseControlClient.TaskState; +import io.strimzi.operator.topic.cruisecontrol.CruiseControlClient.UserTasksResponse; +import io.strimzi.operator.topic.cruisecontrol.CruiseControlClientImpl; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManagerFactory; - -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.net.URI; -import java.net.URLEncoder; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.security.KeyStore; -import java.security.cert.Certificate; -import java.security.cert.CertificateFactory; -import java.time.Duration; -import java.time.temporal.ChronoUnit; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.stream.Collectors; import static io.strimzi.api.kafka.model.topic.ReplicasChangeState.ONGOING; import static io.strimzi.api.kafka.model.topic.ReplicasChangeState.PENDING; -import static io.strimzi.operator.common.CruiseControlUtil.buildBasicAuthValue; -import static io.strimzi.operator.topic.TopicOperatorUtil.getFileContent; import static io.strimzi.operator.topic.TopicOperatorUtil.hasReplicasChange; import static io.strimzi.operator.topic.TopicOperatorUtil.topicNames; import static java.lang.String.format; import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.stream.Collectors.groupingBy; +import static org.apache.logging.log4j.core.util.Throwables.getRootCause; /** * Replicas change handler that interacts with Cruise Control REST API. *

- * The REST endpoints are {@code topic_configuration} to request replication factor changes and {@code user_tasks} - * to check the asynchronous execution result. Cruise Control runs one task execution at a time, additional requests - * are queued up to {@code max.active.user.tasks}. - *

* At any given time, a KafkaTopic.spec.replicas change can be in one of the following states: - *
*
  • Pending: Not in Cruise Control's task queue (not yet sent or request error).
  • *
  • Ongoing: In Cruise Control's task queue, but execution not started, or not completed.
  • *
  • Completed: Cruise Control's task execution completed (target replication factor reconciled).
*/ public class ReplicasChangeHandler { private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(ReplicasChangeHandler.class); - private static final String USER_TASK_ID_HEADER = "User-Task-ID"; - private static final long REQUEST_TIMEOUT_SEC = 60; - private final TopicOperatorConfig config; - private final ExecutorService httpClientExecutor; - private final ObjectMapper mapper; + private final CruiseControlClient apiClient; /** - * Create a new replicas change client instance. + * Create a new replicas change handler instance. * * @param config Topic Operator configuration. */ public ReplicasChangeHandler(TopicOperatorConfig config) { - this.config = config; - this.httpClientExecutor = Executors.newCachedThreadPool(); - this.mapper = new ObjectMapper(); + this.apiClient = new CruiseControlClientImpl( + config.cruiseControlHostname(), + config.cruiseControlPort(), + config.cruiseControlRackEnabled(), + config.cruiseControlSslEnabled(), + config.cruiseControlSslEnabled() ? getFileContent(config.cruiseControlCrtFilePath()) : null, + config.cruiseControlAuthEnabled(), + config.cruiseControlAuthEnabled() ? new String(getFileContent(config.cruiseControlApiUserPath()), UTF_8) : null, + config.cruiseControlAuthEnabled() ? new String(getFileContent(config.cruiseControlApiPassPath()), UTF_8) : null + ); + } + + /** + * Stop the replicas change handler. + */ + public void stop() { + try { + apiClient.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } } /** @@ -102,22 +91,11 @@ public List requestPendingChanges(List rec try { LOGGER.debugOp("Sending topic configuration request, topics {}", topicNames(reconcilableTopics)); - HttpClient client = buildHttpClient(); - client.sendAsync(buildPostTopicConfigRequest(reconcilableTopics), HttpResponse.BodyHandlers.ofString()) - .thenAccept(response -> { - if (response.statusCode() != 200) { - updateToFailed(result, format("Replicas change failed (%s)", response.statusCode()), response); - return; - } - try { - String userTaskId = response.headers().firstValue(USER_TASK_ID_HEADER).get(); - updateToOngoing(result, "Replicas change ongoing", userTaskId); - } catch (Throwable e) { - updateToFailed(result, format("Failed to get %s header", USER_TASK_ID_HEADER), response); - } - }).join(); + List kafkaTopics = reconcilableTopics.stream().map(rt -> rt.kt()).collect(Collectors.toList()); + String userTaskId = apiClient.topicConfiguration(kafkaTopics); + updateToOngoing(result, "Replicas change ongoing", userTaskId); } catch (Throwable t) { - updateToFailed(result, format("Replicas change failed, %s", t.getMessage()), null); + updateToFailed(result, format("Replicas change failed, %s", getRootCause(t).getMessage())); } return result; } @@ -144,130 +122,40 @@ public List requestOngoingChanges(List rec try { LOGGER.debugOp("Sending user tasks request, Tasks {}", groupByUserTaskId.keySet()); - HttpClient client = buildHttpClient(); - client.sendAsync(buildGetUserTasksRequest(groupByUserTaskId.keySet()), HttpResponse.BodyHandlers.ofString()) - .thenAccept(response -> { - try { - if (response.statusCode() != 200 || response.body() == null) { - updateToFailed(result, format("Replicas change failed (%s)", response.statusCode()), response); - return; - } - - UserTasksResponse utr = mapper.readValue(response.body(), UserTasksResponse.class); - if (utr.userTasks().isEmpty()) { - // Cruise Control restarted: reset the state because the tasks queue is not persisted - updateToPending(result, "Task not found, Resetting the state"); - return; - } - - for (var userTask : utr.userTasks()) { - String userTaskId = userTask.userTaskId(); - TaskState state = TaskState.get(userTask.status()); - switch (state) { - case COMPLETED: - updateToCompleted(groupByUserTaskId.get(userTaskId), "Replicas change completed"); - break; - case COMPLETED_WITH_ERROR: - updateToFailed(groupByUserTaskId.get(userTaskId), "Replicas change failed", response); - break; - case ACTIVE: - case IN_EXECUTION: - // do nothing - break; - } - } - } catch (Throwable t) { - updateToFailed(result, format("Failed to parse response, %s", t.getMessage()), null); + UserTasksResponse utr = apiClient.userTasks(new ArrayList<>(groupByUserTaskId.keySet())); + + if (utr.userTasks().isEmpty()) { + // Cruise Control restarted: reset the state because the tasks queue is not persisted + updateToPending(result, "Task not found, Resetting the state"); + } else { + for (var userTask : utr.userTasks()) { + String userTaskId = userTask.userTaskId(); + TaskState state = TaskState.get(userTask.status()); + switch (state) { + case COMPLETED: + updateToCompleted(groupByUserTaskId.get(userTaskId), "Replicas change completed"); + break; + case COMPLETED_WITH_ERROR: + updateToFailed(groupByUserTaskId.get(userTaskId), "Replicas change completed with error"); + break; + case ACTIVE: + case IN_EXECUTION: + // do nothing + break; } - }).join(); + } + } } catch (Throwable t) { - updateToFailed(result, format("Replicas change failed, %s", t.getMessage()), null); + updateToFailed(result, format("Replicas change failed, %s", getRootCause(t).getMessage())); } return result; } - @SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC_ANON") - private HttpClient buildHttpClient() throws Exception { - HttpClient.Builder builder = HttpClient.newBuilder().executor(httpClientExecutor); - if (config.cruiseControlSslEnabled()) { - // load the certificate chain to be trusted - CertificateFactory cf = CertificateFactory.getInstance("X.509"); - Certificate ca; - try (var caInput = new ByteArrayInputStream(getFileContent(config.cruiseControlCrtFilePath()))) { - ca = cf.generateCertificate(caInput); - } catch (IOException ioe) { - throw new RuntimeException(format("File not found: %s", config.cruiseControlCrtFilePath())); - } - // create a P12 keystore containing our trusted chain - KeyStore keyStore = KeyStore.getInstance("PKCS12"); - keyStore.load(null, null); - keyStore.setCertificateEntry("ca", ca); - // create a trust manager that trusts the chain in our keystore - TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX"); - tmf.init(keyStore); - // create an SSL context that uses our trust manager - SSLContext sslContext = SSLContext.getInstance("TLS"); - sslContext.init(null, tmf.getTrustManagers(), null); - builder.sslContext(sslContext); - } - return builder.build(); - } - - private HttpRequest buildPostTopicConfigRequest(List reconcilableTopics) { - StringBuilder url = new StringBuilder( - format("%s://%s:%d%s?", config.cruiseControlSslEnabled() ? "https" : "http", - config.cruiseControlHostname(), config.cruiseControlPort(), CruiseControlEndpoints.TOPIC_CONFIGURATION)); - url.append(format("%s=%s&", CruiseControlParameters.SKIP_RACK_AWARENESS_CHECK, !config.cruiseControlRackEnabled())); - url.append(format("%s=%s&", CruiseControlParameters.DRY_RUN, "false")); - url.append(format("%s=%s", CruiseControlParameters.JSON, "true")); - - Map> topicsByReplicas = reconcilableTopics.stream() - .collect(groupingBy(rt -> rt.kt().getSpec().getReplicas())); - Map requestPayload = new HashMap<>(); - topicsByReplicas.entrySet().forEach(es -> { - int rf = es.getKey(); - List targetNames = topicNames(topicsByReplicas.get(rf)); - requestPayload.put(rf, String.join("|", targetNames)); - }); - String json; + private static byte[] getFileContent(String filePath) { try { - json = mapper.writeValueAsString( - new ReplicationFactorChanges(new ReplicationFactor(requestPayload))); - } catch (JsonProcessingException e) { - throw new RuntimeException("Failed to serialize request body"); - } - - LOGGER.traceOp("Request URL: {}, body: {}", url, json); - HttpRequest.Builder builder = HttpRequest.newBuilder() - .uri(URI.create(url.toString())) - .timeout(Duration.of(REQUEST_TIMEOUT_SEC, ChronoUnit.SECONDS)) - .header("Content-Type", "application/json") - .POST(HttpRequest.BodyPublishers.ofString(json)); - maybeAddBasicAuthHeader(builder); - return builder.build(); - } - - private HttpRequest buildGetUserTasksRequest(Set userTaskIds) { - StringBuilder url = new StringBuilder( - format("%s://%s:%d%s?", config.cruiseControlSslEnabled() ? "https" : "http", - config.cruiseControlHostname(), config.cruiseControlPort(), CruiseControlEndpoints.USER_TASKS)); - url.append(format("%s=%s&", CruiseControlParameters.USER_TASK_IDS, URLEncoder.encode(String.join(",", userTaskIds), UTF_8))); - url.append(format("%s=%s", CruiseControlParameters.JSON, "true")); - - LOGGER.traceOp("Request URL: {}", url); - HttpRequest.Builder builder = HttpRequest.newBuilder() - .uri(URI.create(url.toString())) - .timeout(Duration.of(REQUEST_TIMEOUT_SEC, ChronoUnit.SECONDS)) - .GET(); - maybeAddBasicAuthHeader(builder); - return builder.build(); - } - - private void maybeAddBasicAuthHeader(HttpRequest.Builder builder) { - if (config.cruiseControlAuthEnabled()) { - String apiUsername = new String(getFileContent(config.cruiseControlApiUserPath()), UTF_8); - String apiPassword = new String(getFileContent(config.cruiseControlApiPassPath()), UTF_8); - builder.header("Authorization", buildBasicAuthValue(apiUsername, apiPassword)); + return Files.readAllBytes(Path.of(filePath)); + } catch (IOException ioe) { + throw new RuntimeException(format("File not found: %s", filePath)); } } @@ -293,81 +181,10 @@ private void updateToCompleted(List reconcilableTopics, Strin .withReplicasChange(null).build())); } - private void updateToFailed(List reconcilableTopics, String message, HttpResponse response) { - Optional error = parseErrorResponse(response); - String text = error.isPresent() ? format("%s, %s", message, error.get()) : message; - LOGGER.errorOp("{}, Topics: {}", text, topicNames(reconcilableTopics)); + private void updateToFailed(List reconcilableTopics, String message) { + LOGGER.errorOp("{}, Topics: {}", message, topicNames(reconcilableTopics)); reconcilableTopics.forEach(reconcilableTopic -> reconcilableTopic.kt().setStatus(new KafkaTopicStatusBuilder(reconcilableTopic.kt().getStatus()) - .editOrNewReplicasChange().withMessage(text).endReplicasChange().build())); - } - - private Optional parseErrorResponse(HttpResponse response) { - if (response != null && response.body() != null) { - try { - ErrorResponse errorResponse = mapper.readValue(response.body(), ErrorResponse.class); - if (errorResponse.errorMessage().contains("NotEnoughValidWindowsException")) { - return Optional.of("Cluster model not ready"); - } else if (errorResponse.errorMessage().contains("OngoingExecutionException") - || errorResponse.errorMessage().contains("stop_ongoing_execution")) { - return Optional.of("Another task is executing"); - } else { - return Optional.of(errorResponse.errorMessage()); - } - } catch (Throwable t) { - LOGGER.warnOp("Failed to parse error response: {}", t.getMessage()); - } - } - return Optional.empty(); - } - - private record ReplicationFactorChanges(@JsonProperty("replication_factor") ReplicationFactor replicationFactor) { } - private record ReplicationFactor(@JsonProperty("topic_by_replication_factor") Map topicByReplicationFactor) { } - - private record UserTasksResponse(List userTasks, int version) { } - private record UserTask( - @JsonProperty("Status") String status, - @JsonProperty("ClientIdentity") String clientIdentity, - @JsonProperty("RequestURL") String requestURL, - @JsonProperty("UserTaskId") String userTaskId, - @JsonProperty("StartMs") String startMs - ) { } - private record ErrorResponse(String stackTrace, String errorMessage, int version) { } - - private enum TaskState { - ACTIVE("Active"), - IN_EXECUTION("InExecution"), - COMPLETED("Completed"), - COMPLETED_WITH_ERROR("CompletedWithError"); - - private static final List CACHED_VALUES = List.of(values()); - private final String value; - TaskState(String value) { - this.value = value; - } - - @Override - public String toString() { - return value; - } - - /** - * Use this instead of values() to avoid creating a new array each time. - * @return enumerated values in the same order as values() - */ - public static List cachedValues() { - return CACHED_VALUES; - } - - /** - * Get the enum constant by value. - * @param value Value. - * @return Constant. - */ - public static TaskState get(String value) { - Optional constant = cachedValues().stream() - .filter(v -> v.toString().equals(value)).findFirst(); - return constant.orElseThrow(); - } + .editOrNewReplicasChange().withMessage(message).endReplicasChange().build())); } } diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorMain.java b/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorMain.java index c701ff577fa..b1bccc2c4c2 100644 --- a/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorMain.java +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorMain.java @@ -40,36 +40,38 @@ public class TopicOperatorMain implements Liveness, Readiness { private final static ReconciliationLogger LOGGER = ReconciliationLogger.create(TopicOperatorMain.class); private final static long INFORMER_PERIOD_MS = 2_000; - private final ResourceEventHandler handler; private final String namespace; - private final KubernetesClient client; + private final KubernetesClient kubeClient; /* test */ final BatchingLoop queue; private final long resyncIntervalMs; private final BasicItemStore itemStore; + private final ReplicasChangeHandler replicasChangeHandler; /* test */ final BatchingTopicController controller; private final Admin admin; private SharedIndexInformer informer; // guarded by this Thread shutdownHook; // guarded by this + private final ResourceEventHandler resourceEventHandler; private final HealthCheckAndMetricsServer healthAndMetricsServer; TopicOperatorMain(String namespace, Map selector, Admin admin, - KubernetesClient client, + KubernetesClient kubeClient, TopicOperatorConfig config) { Objects.requireNonNull(namespace); Objects.requireNonNull(selector); this.namespace = namespace; - this.client = client; + this.kubeClient = kubeClient; this.resyncIntervalMs = config.fullReconciliationIntervalMs(); this.admin = admin; TopicOperatorMetricsProvider metricsProvider = createMetricsProvider(); TopicOperatorMetricsHolder metrics = new TopicOperatorMetricsHolder(KafkaTopic.RESOURCE_KIND, Labels.fromMap(selector), metricsProvider); - this.controller = new BatchingTopicController(config, selector, admin, client, metrics, new ReplicasChangeHandler(config)); + this.replicasChangeHandler = new ReplicasChangeHandler(config); + this.controller = new BatchingTopicController(config, selector, admin, kubeClient, metrics, replicasChangeHandler); this.itemStore = new BasicItemStore<>(Cache::metaNamespaceKeyFunc); this.queue = new BatchingLoop(config.maxQueueSize(), controller, 1, config.maxBatchSize(), config.maxBatchLingerMs(), itemStore, this::stop, metrics, namespace); - this.handler = new TopicOperatorEventHandler(config, queue, metrics); + this.resourceEventHandler = new TopicOperatorEventHandler(config, queue, metrics); this.healthAndMetricsServer = new HealthCheckAndMetricsServer(8080, this, this, metricsProvider); } @@ -86,7 +88,7 @@ synchronized void start() { healthAndMetricsServer.start(); LOGGER.infoOp("Starting queue"); queue.start(); - informer = Crds.topicOperation(client) + informer = Crds.topicOperation(kubeClient) .inNamespace(namespace) // Do NOT use withLabels to filter the informer, since the controller is stateful // (topics need to be added to removed from TopicController.topics if KafkaTopics transition between @@ -97,7 +99,7 @@ synchronized void start() { // is that the handler skips one informer intervals. Setting both intervals to the same value generates // just enough skew that when the informer checks if the handler is ready for resync it sees that // it still needs another couple of micro-seconds and skips to the next informer level resync. - .addEventHandlerWithResyncPeriod(handler, resyncIntervalMs + INFORMER_PERIOD_MS) + .addEventHandlerWithResyncPeriod(resourceEventHandler, resyncIntervalMs + INFORMER_PERIOD_MS) .itemStore(itemStore); LOGGER.infoOp("Starting informer"); informer.run(); @@ -131,6 +133,9 @@ private synchronized void shutdown() { informer.stop(); informer = null; } + if (replicasChangeHandler != null) { + replicasChangeHandler.stop(); + } this.queue.stop(); this.admin.close(); this.healthAndMetricsServer.stop(); diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorUtil.java b/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorUtil.java index e635957b347..cd28d916d7b 100644 --- a/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorUtil.java +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorUtil.java @@ -10,15 +10,10 @@ import io.strimzi.operator.common.Annotations; import io.strimzi.operator.topic.metrics.TopicOperatorMetricsHolder; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.List; import java.util.function.Function; import java.util.stream.Collectors; -import static java.lang.String.format; - /** * Provides utility methods for managing and interacting with KafkaTopic resources within a Topic Operator context. * This class includes functionalities such as extracting topic names from KafkaTopic resources, managing reconciliation @@ -146,20 +141,6 @@ public static boolean isPaused(KafkaTopic kt) { return Annotations.isReconciliationPausedWithAnnotation(kt); } - /** - * Get file content. - * - * @param filePath Absolute file path. - * @return File content. - */ - public static byte[] getFileContent(String filePath) { - try { - return Files.readAllBytes(Path.of(filePath)); - } catch (IOException ioe) { - throw new RuntimeException(format("File not found: %s", filePath)); - } - } - /** * Whether the {@link KafkaTopic} status has replicas change. * diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/cruisecontrol/CruiseControlClient.java b/topic-operator/src/main/java/io/strimzi/operator/topic/cruisecontrol/CruiseControlClient.java new file mode 100644 index 00000000000..7419692fe0b --- /dev/null +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/cruisecontrol/CruiseControlClient.java @@ -0,0 +1,158 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.topic.cruisecontrol; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.strimzi.api.kafka.model.topic.KafkaTopic; + +import java.net.http.HttpResponse; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * Cruise Control REST API client. + *

+ * The server runs one task execution at a time, additional + * requests are queued up to {@code max.active.user.tasks}. + */ +public interface CruiseControlClient extends AutoCloseable { + /** + * HTTP request timeout in seconds. + */ + long HTTP_REQUEST_TIMEOUT_SEC = 60; + + /** + * Send a POST request to {@code topic_configuration} endpoint. + * This can be used to request replication factor changes. + * + * @param kafkaTopics List of Kafka topics. + * @return The user task id. + */ + String topicConfiguration(List kafkaTopics); + + /** + * Send a GET request to {@code user_tasks} endpoint. + * This can be used to check the asynchronous task execution result. + * + * @param userTaskIds List of user task ids. + * @return User tasks response. + */ + UserTasksResponse userTasks(List userTaskIds); + + /** + * Get the error message from HTTP response. + * + * @param response The HTTP response. + * @return The error message. + */ + Optional errorMessage(HttpResponse response); + + /** + * Topic names grouped by replication factor value. + * + * @param topicByReplicationFactor Topic names grouped by replication factor value. + */ + record ReplicationFactor(@JsonProperty("topic_by_replication_factor") Map topicByReplicationFactor) { } + + /** + * Replication factor changes. + * + * @param replicationFactor Replication factor value. + */ + record ReplicationFactorChanges(@JsonProperty("replication_factor") ReplicationFactor replicationFactor) { } + + /** + * The user task. + * + * @param status Status. + * @param clientIdentity Client identity. + * @param requestURL Request URL. + * @param userTaskId User task id. + * @param startMs Start time in ms. + */ + record UserTask( + @JsonProperty("Status") String status, + @JsonProperty("ClientIdentity") String clientIdentity, + @JsonProperty("RequestURL") String requestURL, + @JsonProperty("UserTaskId") String userTaskId, + @JsonProperty("StartMs") String startMs + ) { } + + /** + * The user tasks response. + * + * @param userTasks User task list. + * @param version Version. + */ + record UserTasksResponse(List userTasks, int version) { } + + /** + * The error response. + * + * @param stackTrace Stack trace. + * @param errorMessage Error message. + * @param version Version. + */ + record ErrorResponse(String stackTrace, String errorMessage, int version) { } + + /** + * Task states. + */ + enum TaskState { + /** + * The task has been accepted and waiting for execution. + */ + ACTIVE("Active"), + + /** + * The task is being executed. + */ + IN_EXECUTION("InExecution"), + + /** + * The task has been completed. + */ + COMPLETED("Completed"), + + /** + * The task has been completed with errors. + */ + COMPLETED_WITH_ERROR("CompletedWithError"); + + private static final List CACHED_VALUES = List.of(values()); + private final String value; + TaskState(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + + /** + * Use this instead of values() to avoid creating a new array each time. + * + * @return enumerated values in the same order as values() + */ + public static List cachedValues() { + return CACHED_VALUES; + } + + /** + * Get the enum constant by value. + * + * @param value Value. + * @return Constant. + */ + public static TaskState get(String value) { + Optional constant = cachedValues().stream() + .filter(v -> v.toString().equals(value)).findFirst(); + return constant.orElseThrow(); + } + } +} + diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/cruisecontrol/CruiseControlClientImpl.java b/topic-operator/src/main/java/io/strimzi/operator/topic/cruisecontrol/CruiseControlClientImpl.java new file mode 100644 index 00000000000..c75675dec9e --- /dev/null +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/cruisecontrol/CruiseControlClientImpl.java @@ -0,0 +1,271 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.topic.cruisecontrol; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.strimzi.api.kafka.model.topic.KafkaTopic; +import io.strimzi.operator.common.ReconciliationLogger; +import io.strimzi.operator.common.model.cruisecontrol.CruiseControlEndpoints; +import io.strimzi.operator.common.model.cruisecontrol.CruiseControlHeaders; +import io.strimzi.operator.common.model.cruisecontrol.CruiseControlParameters; +import io.strimzi.operator.topic.TopicOperatorUtil; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; + +import java.io.ByteArrayInputStream; +import java.net.ConnectException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.security.KeyStore; +import java.security.cert.Certificate; +import java.security.cert.CertificateFactory; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static io.strimzi.operator.common.CruiseControlUtil.buildBasicAuthValue; +import static java.lang.String.format; +import static java.util.stream.Collectors.groupingBy; +import static org.apache.logging.log4j.core.util.Throwables.getRootCause; + +/** + * Cruise Control REST API client based on Java HTTP client. + */ +public class CruiseControlClientImpl implements CruiseControlClient { + private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(CruiseControlClientImpl.class); + + private String serverHostname; + private int serverPort; + private boolean rackEnabled; + private boolean sslEnabled; + private byte[] sslCertificate; + private boolean authEnabled; + private String authUsername; + private String authPassword; + + private ExecutorService httpClientExecutor; + private HttpClient httpClient; + private ObjectMapper objectMapper; + + /** + * Create Cruise Control client instance. + * + * @param serverHostname Server hostname. + * @param serverPort Server port. + * @param rackEnabled Whether rack awareness is enabled. + * @param sslEnabled Whether SSL is enabled. + * @param sslCertificate SSL certificate. + * @param authEnabled Whether authentication is enabled. + * @param authUsername Authentication username. + * @param authPassword Authentication password. + */ + public CruiseControlClientImpl(String serverHostname, + int serverPort, + boolean rackEnabled, + boolean sslEnabled, + byte[] sslCertificate, + boolean authEnabled, + String authUsername, + String authPassword) { + this.serverHostname = serverHostname; + this.serverPort = serverPort; + this.rackEnabled = rackEnabled; + this.sslEnabled = sslEnabled; + this.sslCertificate = sslCertificate; + this.authEnabled = authEnabled; + this.authUsername = authUsername; + this.authPassword = authPassword; + this.httpClientExecutor = Executors.newCachedThreadPool(); + this.httpClient = buildHttpClient(); + this.objectMapper = new ObjectMapper(); + } + + @Override + public void close() throws Exception { + stopExecutor(httpClientExecutor, 10_000); + httpClient = null; + } + + @Override + public String topicConfiguration(List kafkaTopics) { + // compute payload + Map> topicsByReplicas = kafkaTopics.stream() + .collect(groupingBy(kt -> kt.getSpec().getReplicas())); + Map requestPayload = new HashMap<>(); + topicsByReplicas.entrySet().forEach(es -> { + int rf = es.getKey(); + List targetNames = topicsByReplicas.get(rf) + .stream().map(TopicOperatorUtil::topicName).collect(Collectors.toList()); + requestPayload.put(rf, String.join("|", targetNames)); + }); + String jsonPayload; + try { + jsonPayload = objectMapper.writeValueAsString( + new ReplicationFactorChanges(new ReplicationFactor(requestPayload))); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize request"); + } + + // build request + URI requestUri = new UrlBuilder(serverHostname, serverPort, CruiseControlEndpoints.TOPIC_CONFIGURATION, sslEnabled) + .withParameter(CruiseControlParameters.SKIP_RACK_AWARENESS_CHECK, String.valueOf(!rackEnabled)) + .withParameter(CruiseControlParameters.DRY_RUN, "false") + .withParameter(CruiseControlParameters.JSON, "true") + .build(); + LOGGER.traceOp("Request URI: {}", requestUri.toString()); + + HttpRequest.Builder builder = HttpRequest.newBuilder() + .uri(requestUri) + .timeout(Duration.of(HTTP_REQUEST_TIMEOUT_SEC, ChronoUnit.SECONDS)) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(jsonPayload)); + if (authEnabled) { + builder.header("Authorization", buildBasicAuthValue(authUsername, authPassword)); + } + HttpRequest request = builder.build(); + + // send request and handle response + return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()).thenApply(response -> { + LOGGER.traceOp("Response: {}", response); + if (response.statusCode() != 200) { + Optional error = errorMessage(response); + throw new RuntimeException(error.isPresent() + ? format("Request failed (%s), %s", response.statusCode(), error.get()) + : format("Request failed (%s)", response.statusCode()) + ); + } + return response.headers().firstValue(CruiseControlHeaders.USER_TASK_ID_HEADER.toString()).get(); + }).exceptionally(t -> { + if (t.getCause() instanceof ConnectException) { + throw new RuntimeException("Connection failed"); + } else { + throw new RuntimeException(getRootCause(t).getMessage()); + } + }).join(); + } + + @Override + public UserTasksResponse userTasks(List userTaskIds) { + // build request + URI requestUrl = new UrlBuilder(serverHostname, serverPort, CruiseControlEndpoints.USER_TASKS, sslEnabled) + .withParameter(CruiseControlParameters.USER_TASK_IDS, userTaskIds) + .withParameter(CruiseControlParameters.FETCH_COMPLETE, "false") + .withParameter(CruiseControlParameters.JSON, "true") + .build(); + LOGGER.traceOp("Request URL: {}", requestUrl.toString()); + + HttpRequest.Builder builder = HttpRequest.newBuilder() + .uri(requestUrl) + .timeout(Duration.of(HTTP_REQUEST_TIMEOUT_SEC, ChronoUnit.SECONDS)) + .GET(); + if (authEnabled) { + builder.header("Authorization", buildBasicAuthValue(authUsername, authPassword)); + } + HttpRequest request = builder.build(); + + // send request and handle response + return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()).thenApply(response -> { + LOGGER.traceOp("Response: {}", response); + if (response.statusCode() != 200) { + Optional error = errorMessage(response); + throw new RuntimeException(error.isPresent() + ? format("Request failed (%s), %s", response.statusCode(), error.get()) + : format("Request failed (%s)", response.statusCode()) + ); + } + + try { + return objectMapper.readValue(response.body(), UserTasksResponse.class); + } catch (JsonProcessingException e) { + throw new RuntimeException(format("Response deserialization failed, %s", e.getMessage())); + } + }).exceptionally(t -> { + if (t.getCause() instanceof ConnectException) { + throw new RuntimeException("Connection failed"); + } else { + throw new RuntimeException(getRootCause(t).getMessage()); + } + }).join(); + } + + public Optional errorMessage(HttpResponse response) { + if (response != null && response.body() != null && !response.body().isBlank()) { + try { + ErrorResponse errorResponse = objectMapper.readValue(response.body(), ErrorResponse.class); + if (errorResponse.errorMessage() != null) { + if (errorResponse.errorMessage().contains("NotEnoughValidWindowsException")) { + return Optional.of("Cluster model not ready"); + } else if (errorResponse.errorMessage().contains("OngoingExecutionException") + || errorResponse.errorMessage().contains("stop_ongoing_execution")) { + return Optional.of("Another task is executing"); + } else { + return Optional.of(errorResponse.errorMessage()); + } + } + } catch (Throwable t) { + throw new RuntimeException(format("Error message parsing failed: %s", t.getMessage())); + } + } + return Optional.empty(); + } + + @SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC_ANON") + private HttpClient buildHttpClient() { + try { + HttpClient.Builder builder = HttpClient.newBuilder().executor(httpClientExecutor); + if (sslEnabled) { + // load the certificate chain to be trusted + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + Certificate ca; + try (var caInput = new ByteArrayInputStream(sslCertificate)) { + ca = cf.generateCertificate(caInput); + } + + // create a P12 keystore containing our trusted chain + KeyStore keyStore = KeyStore.getInstance("PKCS12"); + keyStore.load(null, null); + keyStore.setCertificateEntry("ca", ca); + + // create a trust manager that trusts the chain in our keystore + TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX"); + tmf.init(keyStore); + + // create an SSL context that uses our trust manager + SSLContext sslContext = SSLContext.getInstance("TLS"); + sslContext.init(null, tmf.getTrustManagers(), null); + builder.sslContext(sslContext); + } + return builder.build(); + } catch (Throwable t) { + throw new RuntimeException(format("HTTP client build failed: %s", t.getMessage())); + } + } + + private static void stopExecutor(ExecutorService executor, long timeoutMs) { + if (executor == null || timeoutMs < 0) { + return; + } + try { + executor.shutdown(); + executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + if (!executor.isTerminated()) { + executor.shutdownNow(); + } + } + } +} diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/cruisecontrol/UrlBuilder.java b/topic-operator/src/main/java/io/strimzi/operator/topic/cruisecontrol/UrlBuilder.java new file mode 100644 index 00000000000..7b776c103d5 --- /dev/null +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/cruisecontrol/UrlBuilder.java @@ -0,0 +1,88 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.topic.cruisecontrol; + +import io.strimzi.operator.common.model.cruisecontrol.CruiseControlEndpoints; +import io.strimzi.operator.common.model.cruisecontrol.CruiseControlParameters; + +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.util.List; + +import static java.lang.String.format; + +/** + * Cruise Control URL builder. + */ +public class UrlBuilder { + private String uri; + private boolean firstParam; + + /** + * Create a URL builder instance. + * + * @param hostname Server hostname. + * @param port Server port. + * @param endpoint Endpoint name. + * @param ssl Whether SSL should be used. + */ + public UrlBuilder(String hostname, int port, CruiseControlEndpoints endpoint, boolean ssl) { + uri = format("%s://%s:%d%s?", ssl ? "https" : "http", hostname, port, endpoint); + firstParam = true; + } + + /** + * Add query parameter with value to the path. + * + * @param param Cruise Control parameter. + * @param value Parameter value. + * + * @return Instance of this builder. + */ + public UrlBuilder withParameter(CruiseControlParameters param, String value) { + if (!firstParam) { + uri += "&"; + } else { + firstParam = false; + } + try { + uri += param.asPair(value); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e.getMessage()); + } + return this; + } + + /** + * Add query parameter with multiple values to the path. + * + * @param param Cruise Control parameter. + * @param values List of parameter values. + * + * @return Instance of this builder. + */ + public UrlBuilder withParameter(CruiseControlParameters param, List values) { + if (!firstParam) { + uri += "&"; + } else { + firstParam = false; + } + try { + uri += param.asList(values); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e.getMessage()); + } + return this; + } + + /** + * Build the URL. + * + * @return The URL as URI instance. + */ + public URI build() { + return URI.create(uri); + } +} diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/MockCruiseControl.java b/topic-operator/src/test/java/io/strimzi/operator/topic/MockCruiseControl.java index da6baa34b12..eccf90bb65b 100644 --- a/topic-operator/src/test/java/io/strimzi/operator/topic/MockCruiseControl.java +++ b/topic-operator/src/test/java/io/strimzi/operator/topic/MockCruiseControl.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.LogManager; +import static io.strimzi.operator.topic.TopicOperatorTestUtil.contentFromTextFile; import static java.nio.charset.StandardCharsets.UTF_8; /** @@ -101,7 +102,7 @@ public void expectTopicConfigSuccessResponse(File apiUserFile, File apiPassFile) .withPath(CruiseControlEndpoints.TOPIC_CONFIGURATION.toString()) .withContentType(MediaType.APPLICATION_JSON) .withHeader(new Header("Authorization", - CruiseControlUtil.buildBasicAuthValue(TopicOperatorTestUtil.contentFromTextFile(apiUserFile), TopicOperatorTestUtil.contentFromTextFile(apiPassFile)))) + CruiseControlUtil.buildBasicAuthValue(contentFromTextFile(apiUserFile), contentFromTextFile(apiPassFile)))) .withSecure(true)) .respond( HttpResponse.response() @@ -139,7 +140,7 @@ public void expectTopicConfigSuccessResponse(File apiUserFile, File apiPassFile) .withPath(CruiseControlEndpoints.TOPIC_CONFIGURATION.toString()) .withContentType(MediaType.APPLICATION_JSON) .withHeader(new Header("Authorization", - CruiseControlUtil.buildBasicAuthValue(TopicOperatorTestUtil.contentFromTextFile(apiUserFile), TopicOperatorTestUtil.contentFromTextFile(apiPassFile))))) + CruiseControlUtil.buildBasicAuthValue(contentFromTextFile(apiUserFile), contentFromTextFile(apiPassFile))))) .respond( HttpResponse.response() .withStatusCode(HttpStatusCode.OK_200.code()) @@ -159,7 +160,7 @@ public void expectTopicConfigErrorResponse(File apiUserFile, File apiPassFile) { .withPath(CruiseControlEndpoints.TOPIC_CONFIGURATION.toString()) .withContentType(MediaType.APPLICATION_JSON) .withHeader(new Header("Authorization", - CruiseControlUtil.buildBasicAuthValue(TopicOperatorTestUtil.contentFromTextFile(apiUserFile), TopicOperatorTestUtil.contentFromTextFile(apiPassFile)))) + CruiseControlUtil.buildBasicAuthValue(contentFromTextFile(apiUserFile), contentFromTextFile(apiPassFile)))) .withSecure(true)) .respond( HttpResponse.response() @@ -180,7 +181,7 @@ public void expectTopicConfigRequestTimeout(File apiUserFile, File apiPassFile) .withPath(CruiseControlEndpoints.TOPIC_CONFIGURATION.toString()) .withContentType(MediaType.APPLICATION_JSON) .withHeader(new Header("Authorization", - CruiseControlUtil.buildBasicAuthValue(TopicOperatorTestUtil.contentFromTextFile(apiUserFile), TopicOperatorTestUtil.contentFromTextFile(apiPassFile)))) + CruiseControlUtil.buildBasicAuthValue(contentFromTextFile(apiUserFile), contentFromTextFile(apiPassFile)))) .withSecure(true)) .respond( HttpResponse.response() @@ -199,7 +200,7 @@ public void expectTopicConfigRequestUnauthorized(File apiUserFile, File apiPassF .withPath(CruiseControlEndpoints.TOPIC_CONFIGURATION.toString()) .withContentType(MediaType.APPLICATION_JSON) .withHeader(new Header("Authorization", - CruiseControlUtil.buildBasicAuthValue(TopicOperatorTestUtil.contentFromTextFile(apiUserFile), TopicOperatorTestUtil.contentFromTextFile(apiPassFile)))) + CruiseControlUtil.buildBasicAuthValue(contentFromTextFile(apiUserFile), contentFromTextFile(apiPassFile)))) .withSecure(true)) .respond( HttpResponse.response() @@ -231,7 +232,7 @@ public void expectUserTasksSuccessResponse(File apiUserFile, File apiPassFile) { .withQueryStringParameter(Parameter.param(CruiseControlParameters.JSON.toString(), "true")) .withPath(CruiseControlEndpoints.USER_TASKS.toString()) .withHeader(new Header("Authorization", - CruiseControlUtil.buildBasicAuthValue(TopicOperatorTestUtil.contentFromTextFile(apiUserFile), TopicOperatorTestUtil.contentFromTextFile(apiPassFile)))) + CruiseControlUtil.buildBasicAuthValue(contentFromTextFile(apiUserFile), contentFromTextFile(apiPassFile)))) .withSecure(true)) .respond( HttpResponse.response() @@ -263,7 +264,7 @@ public void expectUserTasksSuccessResponse(File apiUserFile, File apiPassFile) { .withQueryStringParameter(Parameter.param(CruiseControlParameters.JSON.toString(), "true")) .withPath(CruiseControlEndpoints.USER_TASKS.toString()) .withHeader(new Header("Authorization", - CruiseControlUtil.buildBasicAuthValue(TopicOperatorTestUtil.contentFromTextFile(apiUserFile), TopicOperatorTestUtil.contentFromTextFile(apiPassFile))))) + CruiseControlUtil.buildBasicAuthValue(contentFromTextFile(apiUserFile), contentFromTextFile(apiPassFile))))) .respond( HttpResponse.response() .withStatusCode(HttpStatusCode.OK_200.code()) @@ -280,7 +281,7 @@ public void expectUserTasksErrorResponse(File apiUserFile, File apiPassFile) { .withQueryStringParameter(Parameter.param(CruiseControlParameters.JSON.toString(), "true")) .withPath(CruiseControlEndpoints.USER_TASKS.toString()) .withHeader(new Header("Authorization", - CruiseControlUtil.buildBasicAuthValue(TopicOperatorTestUtil.contentFromTextFile(apiUserFile), TopicOperatorTestUtil.contentFromTextFile(apiPassFile)))) + CruiseControlUtil.buildBasicAuthValue(contentFromTextFile(apiUserFile), contentFromTextFile(apiPassFile)))) .withSecure(true)) .respond( HttpResponse.response() @@ -298,7 +299,7 @@ public void expectUserTasksRequestTimeout(File apiUserFile, File apiPassFile) { .withQueryStringParameter(Parameter.param(CruiseControlParameters.JSON.toString(), "true")) .withPath(CruiseControlEndpoints.USER_TASKS.toString()) .withHeader(new Header("Authorization", - CruiseControlUtil.buildBasicAuthValue(TopicOperatorTestUtil.contentFromTextFile(apiUserFile), TopicOperatorTestUtil.contentFromTextFile(apiPassFile)))) + CruiseControlUtil.buildBasicAuthValue(contentFromTextFile(apiUserFile), contentFromTextFile(apiPassFile)))) .withSecure(true)) .respond( HttpResponse.response() @@ -315,7 +316,7 @@ public void expectUserTasksRequestUnauthorized(File apiUserFile, File apiPassFil .withQueryStringParameter(Parameter.param(CruiseControlParameters.JSON.toString(), "true")) .withPath(CruiseControlEndpoints.USER_TASKS.toString()) .withHeader(new Header("Authorization", - CruiseControlUtil.buildBasicAuthValue(TopicOperatorTestUtil.contentFromTextFile(apiUserFile), TopicOperatorTestUtil.contentFromTextFile(apiPassFile)))) + CruiseControlUtil.buildBasicAuthValue(contentFromTextFile(apiUserFile), contentFromTextFile(apiPassFile)))) .withSecure(true)) .respond( HttpResponse.response() diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/ReplicasChangeHandlerTest.java b/topic-operator/src/test/java/io/strimzi/operator/topic/ReplicasChangeHandlerTest.java index 7962cc923ff..8c03dcda84e 100644 --- a/topic-operator/src/test/java/io/strimzi/operator/topic/ReplicasChangeHandlerTest.java +++ b/topic-operator/src/test/java/io/strimzi/operator/topic/ReplicasChangeHandlerTest.java @@ -34,6 +34,7 @@ import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; public class ReplicasChangeHandlerTest { private static final String TEST_NAMESPACE = "replicas-change"; @@ -51,7 +52,7 @@ public static void beforeAll() throws IOException { File tlsKeyFile = TestUtils.tempFile(ReplicasChangeHandlerTest.class.getSimpleName(), ".key"); tlsCrtFile = TestUtils.tempFile(ReplicasChangeHandlerTest.class.getSimpleName(), ".crt"); - new MockCertManager().generateSelfSignedCert(tlsKeyFile, tlsCrtFile, + new MockCertManager().generateSelfSignedCert(tlsKeyFile, tlsCrtFile, new Subject.Builder().withCommonName("Trusted Test CA").build(), 365); apiUserFile = TestUtils.tempFile(ReplicasChangeHandlerTest.class.getSimpleName(), ".username"); @@ -106,16 +107,9 @@ public void shouldFailWithSslEnabledAndMissingCrtFile() { entry(TopicOperatorConfig.CRUISE_CONTROL_SSL_ENABLED.key(), "true"), entry(TopicOperatorConfig.CRUISE_CONTROL_CRT_FILE_PATH.key(), "/invalid/ca.crt") )); - - var handler = new ReplicasChangeHandler(config); - - var pending = buildPendingReconcilableTopics(); - var pendingAndOngoing = handler.requestPendingChanges(pending); - assertFailedWithMessage(pendingAndOngoing, "Replicas change failed, File not found: /invalid/ca.crt"); - var ongoing = buildOngoingReconcilableTopics(); - var completedAndFailed = handler.requestOngoingChanges(ongoing); - assertFailedWithMessage(completedAndFailed, "Replicas change failed, File not found: /invalid/ca.crt"); + RuntimeException thrown = assertThrows(RuntimeException.class, () -> new ReplicasChangeHandler(config)); + assertThat(thrown.getMessage(), is("File not found: /invalid/ca.crt")); } @Test @@ -129,16 +123,9 @@ public void shouldFailWithAuthEnabledAndUsernameFileNotFound() { entry(TopicOperatorConfig.CRUISE_CONTROL_API_USER_PATH.key(), "/invalid/username"), entry(TopicOperatorConfig.CRUISE_CONTROL_API_PASS_PATH.key(), apiPassFile.getAbsolutePath()) )); - - var handler = new ReplicasChangeHandler(config); - - var pending = buildPendingReconcilableTopics(); - var pendingAndOngoing = handler.requestPendingChanges(pending); - assertFailedWithMessage(pendingAndOngoing, "Replicas change failed, File not found: /invalid/username"); - var ongoing = buildOngoingReconcilableTopics(); - var completedAndFailed = handler.requestOngoingChanges(ongoing); - assertFailedWithMessage(completedAndFailed, "Replicas change failed, File not found: /invalid/username"); + RuntimeException thrown = assertThrows(RuntimeException.class, () -> new ReplicasChangeHandler(config)); + assertThat(thrown.getMessage(), is("File not found: /invalid/username")); } @Test @@ -152,16 +139,9 @@ public void shouldFailWithAuthEnabledAndPasswordFileNotFound() { entry(TopicOperatorConfig.CRUISE_CONTROL_API_USER_PATH.key(), apiUserFile.getAbsolutePath()), entry(TopicOperatorConfig.CRUISE_CONTROL_API_PASS_PATH.key(), "/invalid/password") )); - - var handler = new ReplicasChangeHandler(config); - - var pending = buildPendingReconcilableTopics(); - var pendingAndOngoing = handler.requestPendingChanges(pending); - assertFailedWithMessage(pendingAndOngoing, "Replicas change failed, File not found: /invalid/password"); - var ongoing = buildOngoingReconcilableTopics(); - var completedAndFailed = handler.requestOngoingChanges(ongoing); - assertFailedWithMessage(completedAndFailed, "Replicas change failed, File not found: /invalid/password"); + RuntimeException thrown = assertThrows(RuntimeException.class, () -> new ReplicasChangeHandler(config)); + assertThat(thrown.getMessage(), is("File not found: /invalid/password")); } @Test @@ -177,11 +157,11 @@ public void shouldFailWhenCruiseControlEndpointNotReachable() { var pending = buildPendingReconcilableTopics(); var pendingAndOngoing = handler.requestPendingChanges(pending); - assertFailedWithMessage(pendingAndOngoing, "Replicas change failed, java.net.ConnectException"); + assertFailedWithMessage(pendingAndOngoing, "Replicas change failed, Connection failed"); var ongoing = buildOngoingReconcilableTopics(); var completedAndFailed = handler.requestOngoingChanges(ongoing); - assertFailedWithMessage(completedAndFailed, "Replicas change failed, java.net.ConnectException"); + assertFailedWithMessage(completedAndFailed, "Replicas change failed, Connection failed"); } @Test @@ -203,13 +183,13 @@ public void shouldFailWhenCruiseControlReturnsErrorResponse() { server.expectTopicConfigErrorResponse(apiUserFile, apiPassFile); var pending = buildPendingReconcilableTopics(); var pendingAndOngoing = handler.requestPendingChanges(pending); - assertFailedWithMessage(pendingAndOngoing, "Replicas change failed (500), Cluster model not ready"); + assertFailedWithMessage(pendingAndOngoing, "Replicas change failed, Request failed (500), Cluster model not ready"); server.expectUserTasksErrorResponse(apiUserFile, apiPassFile); var ongoing = buildOngoingReconcilableTopics(); var completedAndFailed = handler.requestOngoingChanges(ongoing); - assertFailedWithMessage(completedAndFailed, "Replicas change failed (500), Error processing GET " + - "request '/user_tasks' due to: 'Error happened in fetching response for task 9730e4fb-ea41-4e2d-b053-9be2310589b5'."); + assertFailedWithMessage(completedAndFailed, "Replicas change failed, Request failed (500), " + + "Error processing GET request '/user_tasks' due to: 'Error happened in fetching response for task 9730e4fb-ea41-4e2d-b053-9be2310589b5'."); } @Test @@ -231,12 +211,12 @@ public void shouldFailWhenTheRequestTimesOut() { server.expectTopicConfigRequestTimeout(apiUserFile, apiPassFile); var pending = buildPendingReconcilableTopics(); var pendingAndOngoing = handler.requestPendingChanges(pending); - assertFailedWithMessage(pendingAndOngoing, "Replicas change failed (408)"); + assertFailedWithMessage(pendingAndOngoing, "Replicas change failed, Request failed (408)"); server.expectUserTasksRequestTimeout(apiUserFile, apiPassFile); var ongoing = buildOngoingReconcilableTopics(); var completedAndFailed = handler.requestOngoingChanges(ongoing); - assertFailedWithMessage(completedAndFailed, "Replicas change failed (408)"); + assertFailedWithMessage(completedAndFailed, "Replicas change failed, Request failed (408)"); } @Test @@ -258,12 +238,12 @@ public void shouldFailWhenTheRequestIsUnauthorized() { server.expectTopicConfigRequestUnauthorized(apiUserFile, apiPassFile); var pending = buildPendingReconcilableTopics(); var pendingAndOngoing = handler.requestPendingChanges(pending); - assertFailedWithMessage(pendingAndOngoing, "Replicas change failed (401)"); + assertFailedWithMessage(pendingAndOngoing, "Replicas change failed, Request failed (401)"); server.expectUserTasksRequestUnauthorized(apiUserFile, apiPassFile); var ongoing = buildOngoingReconcilableTopics(); var completedAndFailed = handler.requestOngoingChanges(ongoing); - assertFailedWithMessage(completedAndFailed, "Replicas change failed (401)"); + assertFailedWithMessage(completedAndFailed, "Replicas change failed, Request failed (401)"); } private static void assertOngoing(List input, List output) { diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/cruisecontrol/UrlBuilderTest.java b/topic-operator/src/test/java/io/strimzi/operator/topic/cruisecontrol/UrlBuilderTest.java new file mode 100644 index 00000000000..98c03c5d404 --- /dev/null +++ b/topic-operator/src/test/java/io/strimzi/operator/topic/cruisecontrol/UrlBuilderTest.java @@ -0,0 +1,72 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.topic.cruisecontrol; + +import io.strimzi.operator.common.model.cruisecontrol.CruiseControlEndpoints; +import io.strimzi.operator.common.model.cruisecontrol.CruiseControlParameters; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class UrlBuilderTest { + private static final List GOALS = Arrays.asList("goal.one", "goal.two", "goal.three", "goal.four", "goal.five"); + + @Test + public void testUriWithSingleValueQueryParameters() { + String expectedUrl = "https://localhost:9090" + + CruiseControlEndpoints.STATE + "?" + + CruiseControlParameters.JSON + "=true&" + + CruiseControlParameters.DRY_RUN + "=false&" + + CruiseControlParameters.VERBOSE + "=false"; + + URI uri = new UrlBuilder("localhost", 9090, CruiseControlEndpoints.STATE, true) + .withParameter(CruiseControlParameters.JSON, "true") + .withParameter(CruiseControlParameters.DRY_RUN, "false") + .withParameter(CruiseControlParameters.VERBOSE, "false") + .build(); + assertThat(uri.toString(), is(expectedUrl)); + } + + @Test + public void testUriWithListQueryParameters() { + String expectedUrl = "https://localhost:9090" + + CruiseControlEndpoints.REBALANCE + "?" + + CruiseControlParameters.JSON + "=true&" + + CruiseControlParameters.DRY_RUN + "=false&" + + CruiseControlParameters.VERBOSE + "=true&" + + CruiseControlParameters.SKIP_HARD_GOAL_CHECK + "=false&" + + CruiseControlParameters.EXCLUDED_TOPICS + "=test-.*&" + + CruiseControlParameters.GOALS + "="; + + StringBuilder goalStringBuilder = new StringBuilder(); + for (int i = 0; i < GOALS.size(); i++) { + goalStringBuilder.append(GOALS.get(i)); + if (i < GOALS.size() - 1) { + goalStringBuilder.append(","); + } + } + + expectedUrl += URLEncoder.encode(goalStringBuilder.toString(), StandardCharsets.UTF_8) + "&"; + expectedUrl += CruiseControlParameters.REBALANCE_DISK + "=false"; + + URI uri = new UrlBuilder("localhost", 9090, CruiseControlEndpoints.REBALANCE, true) + .withParameter(CruiseControlParameters.JSON, "true") + .withParameter(CruiseControlParameters.DRY_RUN, "false") + .withParameter(CruiseControlParameters.VERBOSE, "true") + .withParameter(CruiseControlParameters.SKIP_HARD_GOAL_CHECK, "false") + .withParameter(CruiseControlParameters.EXCLUDED_TOPICS, "test-.*") + .withParameter(CruiseControlParameters.GOALS, GOALS) + .withParameter(CruiseControlParameters.REBALANCE_DISK, "false") + .build(); + assertThat(uri.toString(), is(expectedUrl)); + } +}