Skip to content

Commit

Permalink
[Feature] Flink Kubernetes opoerator supports ingress (DataLinkDC#4140)
Browse files Browse the repository at this point in the history
Co-authored-by: 李贵发 <[email protected]>
  • Loading branch information
liguifa and 李贵发 authored Jan 15, 2025
1 parent 7f6faa1 commit bd04590
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,8 @@ public String getStatement() {
getArgs(),
getAllowNonRestoredState());
}

public String toString() {
return this.getStatement();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.dinky.utils.JsonUtils;
import org.dinky.utils.LogUtil;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;

import java.util.Collections;
Expand Down Expand Up @@ -127,19 +128,25 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) {
// sleep a time ,because some time the service will not be found
Thread.sleep(3000);

// get jobmanager addr by service
ListOptions options = new ListOptions();
String serviceName = config.getFlinkConfig().getJobName() + "-rest";
options.setFieldSelector("metadata.name=" + serviceName);
ServiceList list = kubernetesClient
.services()
// fixed bug can't find service list #3700
.inNamespace(configuration.get(KubernetesConfigOptions.NAMESPACE))
.list(options);
if (Objects.nonNull(list) && list.getItems().isEmpty()) {
throw new RuntimeException("service list is empty, please check svc list is exists");
ServiceList list = null;
String ipPort = null;
if (!(StringUtils.isNotEmpty(getIngressUrl()) && pingIpPort(getIngressUrl()))) {
// get jobmanager addr by service
ListOptions options = new ListOptions();
String serviceName = config.getFlinkConfig().getJobName() + "-rest";
options.setFieldSelector("metadata.name=" + serviceName);
list = kubernetesClient
.services()
// fixed bug can't find service list #3700
.inNamespace(configuration.get(KubernetesConfigOptions.NAMESPACE))
.list(options);
if (Objects.nonNull(list) && list.getItems().isEmpty()) {
throw new RuntimeException("service list is empty, please check svc list is exists");
}
ipPort = getWebUrl(list, kubernetesClient);
} else {
ipPort = getIngressUrl();
}
String ipPort = getWebUrl(list, kubernetesClient);
result.setWebURL("http://" + ipPort);
result.setId(result.getJids().get(0) + System.currentTimeMillis());
result.success();
Expand All @@ -163,6 +170,11 @@ public String getWebUrl(ServiceList list, KubernetesClient kubernetesClient) {
StringBuilder ipPort = new StringBuilder();
StringBuilder svcRestPort = new StringBuilder();
StringBuilder svcType = new StringBuilder();

if (StringUtils.isNotEmpty(getIngressUrl()) && pingIpPort(getIngressUrl())) {
return getIngressUrl();
}

logger.debug("kubernetes service list : [{}] \n kubernetesClient: [{}]", list, kubernetesClient);
for (Service item : list.getItems()) {
svcRestPort
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.dinky.gateway.kubernetes.operator;

import static org.dinky.gateway.kubernetes.utils.DinkyKubernetsConstants.DINKY_K8S_INGRESS_DOMAIN_KEY;
import static org.dinky.gateway.kubernetes.utils.DinkyKubernetsConstants.DINKY_K8S_INGRESS_ENABLED_KEY;

import org.dinky.assertion.Asserts;
import org.dinky.data.enums.JobStatus;
import org.dinky.gateway.enums.UpgradeMode;
Expand All @@ -27,17 +30,21 @@
import org.dinky.gateway.kubernetes.operator.api.AbstractPodSpec.Resource;
import org.dinky.gateway.kubernetes.operator.api.FlinkDeployment;
import org.dinky.gateway.kubernetes.operator.api.FlinkDeploymentSpec;
import org.dinky.gateway.kubernetes.operator.api.IngressSpec;
import org.dinky.gateway.kubernetes.operator.api.JobSpec;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.gateway.result.TestResult;

import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -48,6 +55,7 @@

import com.google.common.collect.Lists;

import cn.hutool.core.text.StrFormatter;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import lombok.Data;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -220,6 +228,8 @@ private void initSpec() {
flinkDeploymentSpec.setImage(image);
flinkDeployment.setSpec(flinkDeploymentSpec);

flinkDeploymentSpec.setIngress(this.buildIngress());

if (Asserts.isNotNull(serviceAccount)) {
flinkDeploymentSpec.setServiceAccount(serviceAccount);
logger.info("serviceAccount is : {}", serviceAccount);
Expand All @@ -229,6 +239,46 @@ private void initSpec() {
}
}

private IngressSpec buildIngress() {
String ingressDomain = checkUseIngress();
if (StringUtils.isNotEmpty(ingressDomain)) {
IngressSpec ingressSpec = new IngressSpec();
ingressSpec.setTemplate(ingressDomain + "/{{namespace}}/{{name}}(/|$)(.*)");
ingressSpec.setClassName("nginx");
HashMap annotations = new HashMap<String, String>();
annotations.put("nginx.ingress.kubernetes.io/rewrite-target", "/$2");
ingressSpec.setAnnotations(annotations);
return ingressSpec;
}
return null;
}

private String checkUseIngress() {
Map<String, Object> ingressConfig = k8sConfig.getIngressConfig();
if (MapUtils.isNotEmpty(ingressConfig)) {
boolean ingressEnable = Boolean.parseBoolean(ingressConfig
.getOrDefault(DINKY_K8S_INGRESS_ENABLED_KEY, "false")
.toString());
String ingressDomain = ingressConfig
.getOrDefault(DINKY_K8S_INGRESS_DOMAIN_KEY, StringUtils.EMPTY)
.toString();
if (ingressEnable && StringUtils.isNotEmpty(ingressDomain)) {
return ingressDomain;
}
}
return StringUtils.EMPTY;
}

protected String getIngressUrl() {
String jobName = config.getFlinkConfig().getJobName();
String nameSpace = kubernetesConfiguration.get("kubernetes.namespace");
String ingressDomain = checkUseIngress();
if (StringUtils.isNotEmpty(ingressDomain)) {
return StrFormatter.format("{}/{}/{}", ingressDomain, nameSpace, jobName);
}
return StringUtils.EMPTY;
}

private void initBase() {
flinkDeployment.setApiVersion("flink.apache.org/v1beta1");
flinkDeployment.setKind("FlinkDeployment");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ export default (props: {
placeholder={l('rc.cc.flinkConfigPathPlaceholder')}
tooltip={l('rc.cc.flinkConfigPathHelp')}
/>
{type && type === ClusterType.KUBERNETES_APPLICATION && (
{type && (type === ClusterType.KUBERNETES_APPLICATION || type === ClusterType.KUBERNETES_OPERATOR) && (
<ProFormSwitch
name={['config', 'kubernetesConfig', 'ingressConfig', 'kubernetes.ingress.enabled']}
label={l('rc.cc.k8s.ingress.enabled')}
Expand All @@ -210,7 +210,7 @@ export default (props: {
unCheckedChildren={l('button.disable')}
/>
)}
{type && type === ClusterType.KUBERNETES_APPLICATION && ingressEnabled && (
{type && (type === ClusterType.KUBERNETES_APPLICATION || type === ClusterType.KUBERNETES_OPERATOR) && ingressEnabled && (
<ProFormText
tooltip={l('rc.cc.k8s.ingress.domainHelp')}
name={['config', 'kubernetesConfig', 'ingressConfig', 'kubernetes.ingress.domain']}
Expand Down

0 comments on commit bd04590

Please sign in to comment.