From bd045900e95ee05083c1c7b928ebc3e1966e967f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E8=B4=B5=E5=8F=91?= Date: Wed, 15 Jan 2025 14:33:30 +0800 Subject: [PATCH] [Feature] Flink Kubernetes opoerator supports ingress (#4140) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 李贵发 --- .../org/dinky/data/model/JarSubmitParam.java | 4 ++ .../KubernetesApplicationOperatorGateway.java | 36 ++++++++----- .../operator/KubernetesOperatorGateway.java | 50 +++++++++++++++++++ .../ConfigurationForm/FlinkK8s/index.tsx | 4 +- 4 files changed, 80 insertions(+), 14 deletions(-) diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/data/model/JarSubmitParam.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/data/model/JarSubmitParam.java index 8da95a15d9..e46a13f20a 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/data/model/JarSubmitParam.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/data/model/JarSubmitParam.java @@ -106,4 +106,8 @@ public String getStatement() { getArgs(), getAllowNonRestoredState()); } + + public String toString() { + return this.getStatement(); + } } diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetesApplicationOperatorGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetesApplicationOperatorGateway.java index 50ac313ce9..25e88a3556 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetesApplicationOperatorGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetesApplicationOperatorGateway.java @@ -29,6 +29,7 @@ import org.dinky.utils.JsonUtils; import org.dinky.utils.LogUtil; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import java.util.Collections; @@ -127,19 +128,25 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) { // sleep a time ,because some time the service will not be found Thread.sleep(3000); - // get jobmanager addr by service - ListOptions options = new ListOptions(); - String serviceName = config.getFlinkConfig().getJobName() + "-rest"; - options.setFieldSelector("metadata.name=" + serviceName); - ServiceList list = kubernetesClient - .services() - // fixed bug can't find service list #3700 - .inNamespace(configuration.get(KubernetesConfigOptions.NAMESPACE)) - .list(options); - if (Objects.nonNull(list) && list.getItems().isEmpty()) { - throw new RuntimeException("service list is empty, please check svc list is exists"); + ServiceList list = null; + String ipPort = null; + if (!(StringUtils.isNotEmpty(getIngressUrl()) && pingIpPort(getIngressUrl()))) { + // get jobmanager addr by service + ListOptions options = new ListOptions(); + String serviceName = config.getFlinkConfig().getJobName() + "-rest"; + options.setFieldSelector("metadata.name=" + serviceName); + list = kubernetesClient + .services() + // fixed bug can't find service list #3700 + .inNamespace(configuration.get(KubernetesConfigOptions.NAMESPACE)) + .list(options); + if (Objects.nonNull(list) && list.getItems().isEmpty()) { + throw new RuntimeException("service list is empty, please check svc list is exists"); + } + ipPort = getWebUrl(list, kubernetesClient); + } else { + ipPort = getIngressUrl(); } - String ipPort = getWebUrl(list, kubernetesClient); result.setWebURL("http://" + ipPort); result.setId(result.getJids().get(0) + System.currentTimeMillis()); result.success(); @@ -163,6 +170,11 @@ public String getWebUrl(ServiceList list, KubernetesClient kubernetesClient) { StringBuilder ipPort = new StringBuilder(); StringBuilder svcRestPort = new StringBuilder(); StringBuilder svcType = new StringBuilder(); + + if (StringUtils.isNotEmpty(getIngressUrl()) && pingIpPort(getIngressUrl())) { + return getIngressUrl(); + } + logger.debug("kubernetes service list : [{}] \n kubernetesClient: [{}]", list, kubernetesClient); for (Service item : list.getItems()) { svcRestPort diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetesOperatorGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetesOperatorGateway.java index 91136ce3b5..c85bfba774 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetesOperatorGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetesOperatorGateway.java @@ -19,6 +19,9 @@ package org.dinky.gateway.kubernetes.operator; +import static org.dinky.gateway.kubernetes.utils.DinkyKubernetsConstants.DINKY_K8S_INGRESS_DOMAIN_KEY; +import static org.dinky.gateway.kubernetes.utils.DinkyKubernetsConstants.DINKY_K8S_INGRESS_ENABLED_KEY; + import org.dinky.assertion.Asserts; import org.dinky.data.enums.JobStatus; import org.dinky.gateway.enums.UpgradeMode; @@ -27,10 +30,13 @@ import org.dinky.gateway.kubernetes.operator.api.AbstractPodSpec.Resource; import org.dinky.gateway.kubernetes.operator.api.FlinkDeployment; import org.dinky.gateway.kubernetes.operator.api.FlinkDeploymentSpec; +import org.dinky.gateway.kubernetes.operator.api.IngressSpec; import org.dinky.gateway.kubernetes.operator.api.JobSpec; import org.dinky.gateway.result.SavePointResult; import org.dinky.gateway.result.TestResult; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; @@ -38,6 +44,7 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -48,6 +55,7 @@ import com.google.common.collect.Lists; +import cn.hutool.core.text.StrFormatter; import io.fabric8.kubernetes.api.model.ObjectMeta; import lombok.Data; import lombok.EqualsAndHashCode; @@ -220,6 +228,8 @@ private void initSpec() { flinkDeploymentSpec.setImage(image); flinkDeployment.setSpec(flinkDeploymentSpec); + flinkDeploymentSpec.setIngress(this.buildIngress()); + if (Asserts.isNotNull(serviceAccount)) { flinkDeploymentSpec.setServiceAccount(serviceAccount); logger.info("serviceAccount is : {}", serviceAccount); @@ -229,6 +239,46 @@ private void initSpec() { } } + private IngressSpec buildIngress() { + String ingressDomain = checkUseIngress(); + if (StringUtils.isNotEmpty(ingressDomain)) { + IngressSpec ingressSpec = new IngressSpec(); + ingressSpec.setTemplate(ingressDomain + "/{{namespace}}/{{name}}(/|$)(.*)"); + ingressSpec.setClassName("nginx"); + HashMap annotations = new HashMap(); + annotations.put("nginx.ingress.kubernetes.io/rewrite-target", "/$2"); + ingressSpec.setAnnotations(annotations); + return ingressSpec; + } + return null; + } + + private String checkUseIngress() { + Map ingressConfig = k8sConfig.getIngressConfig(); + if (MapUtils.isNotEmpty(ingressConfig)) { + boolean ingressEnable = Boolean.parseBoolean(ingressConfig + .getOrDefault(DINKY_K8S_INGRESS_ENABLED_KEY, "false") + .toString()); + String ingressDomain = ingressConfig + .getOrDefault(DINKY_K8S_INGRESS_DOMAIN_KEY, StringUtils.EMPTY) + .toString(); + if (ingressEnable && StringUtils.isNotEmpty(ingressDomain)) { + return ingressDomain; + } + } + return StringUtils.EMPTY; + } + + protected String getIngressUrl() { + String jobName = config.getFlinkConfig().getJobName(); + String nameSpace = kubernetesConfiguration.get("kubernetes.namespace"); + String ingressDomain = checkUseIngress(); + if (StringUtils.isNotEmpty(ingressDomain)) { + return StrFormatter.format("{}/{}/{}", ingressDomain, nameSpace, jobName); + } + return StringUtils.EMPTY; + } + private void initBase() { flinkDeployment.setApiVersion("flink.apache.org/v1beta1"); flinkDeployment.setKind("FlinkDeployment"); diff --git a/dinky-web/src/pages/RegCenter/Cluster/Configuration/components/ConfigurationModal/ConfigurationForm/FlinkK8s/index.tsx b/dinky-web/src/pages/RegCenter/Cluster/Configuration/components/ConfigurationModal/ConfigurationForm/FlinkK8s/index.tsx index 9ab5c0cba3..6b429594b7 100644 --- a/dinky-web/src/pages/RegCenter/Cluster/Configuration/components/ConfigurationModal/ConfigurationForm/FlinkK8s/index.tsx +++ b/dinky-web/src/pages/RegCenter/Cluster/Configuration/components/ConfigurationModal/ConfigurationForm/FlinkK8s/index.tsx @@ -201,7 +201,7 @@ export default (props: { placeholder={l('rc.cc.flinkConfigPathPlaceholder')} tooltip={l('rc.cc.flinkConfigPathHelp')} /> - {type && type === ClusterType.KUBERNETES_APPLICATION && ( + {type && (type === ClusterType.KUBERNETES_APPLICATION || type === ClusterType.KUBERNETES_OPERATOR) && ( )} - {type && type === ClusterType.KUBERNETES_APPLICATION && ingressEnabled && ( + {type && (type === ClusterType.KUBERNETES_APPLICATION || type === ClusterType.KUBERNETES_OPERATOR) && ingressEnabled && (