diff --git a/inlong-manager/manager-plugins/base/pom.xml b/inlong-manager/manager-plugins/base/pom.xml index f49388b5bae..9c738b7e69c 100644 --- a/inlong-manager/manager-plugins/base/pom.xml +++ b/inlong-manager/manager-plugins/base/pom.xml @@ -45,17 +45,6 @@ provided - - org.apache.inlong - manager-plugins-flink-v1.13 - ${project.version} - - - org.apache.inlong - manager-plugins-flink-v1.15 - ${project.version} - - org.projectlombok lombok @@ -116,4 +105,40 @@ + + + + v1.13 + + true + + + + org.apache.inlong + manager-plugins-flink-v1.13 + ${project.version} + + + + + v1.15 + + + org.apache.inlong + manager-plugins-flink-v1.15 + ${project.version} + + + + + v1.18 + + + org.apache.inlong + manager-plugins-flink-v1.18 + ${project.version} + + + + diff --git a/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml b/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml index 2779612f404..bc9c164d6ad 100644 --- a/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml +++ b/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml @@ -52,6 +52,15 @@ + + + ../manager-plugins-flink-v1.18/target + ./ + + manager-plugins-flink-v1.18.jar + + + ../manager-plugins-flink-v1.13/target @@ -75,5 +84,17 @@ manager-plugins-flink-v1.15.jar + + + + ../manager-plugins-flink-v1.18/target + ./flink-v1.18 + + flink-*.jar + sort-flink-*.jar + scala-*.jar + manager-plugins-flink-v1.18.jar + + \ No newline at end of file diff --git a/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/pom.xml b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/pom.xml new file mode 100644 index 00000000000..6ad32208c54 --- /dev/null +++ b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/pom.xml @@ -0,0 +1,95 @@ + + + + 4.0.0 + + org.apache.inlong + manager-plugins + 1.13.0-SNAPSHOT + + + manager-plugins-flink-v1.18 + Apache InLong - Manager Plugins Flink v1.18 + + + ${project.parent.parent.parent.basedir} + 1.18.1 + + + + + org.apache.inlong + sort-flink-dependencies-v1.18 + ${project.version} + + + org.apache.flink + flink-file-sink-common + + + + + org.apache.flink + flink-table-api-java + ${flink.version} + + + org.apache.flink + flink-core + ${flink.version} + + + org.apache.flink + flink-table-common + ${flink.version} + + + org.apache.flink + flink-shaded-jackson + 2.15.3-18.0 + + + + org.projectlombok + lombok + + + + + manager-plugins-flink-v1.18 + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + + copy-dependencies + + package + + target/ + + + + + + + diff --git a/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java new file mode 100644 index 00000000000..58094e9bf72 --- /dev/null +++ b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.flink; + +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; + +import java.util.concurrent.CompletableFuture; + +/** + * Flink service, such as save or get flink config info, etc. + */ +@Slf4j +public class FlinkClientService { + + private final Configuration configuration; + private final RestClusterClient flinkClient; + + public FlinkClientService(Configuration configuration) throws Exception { + this.configuration = configuration; + this.flinkClient = getFlinkClient(); + } + + /** + * Get the Flink Client. + */ + public RestClusterClient getFlinkClient() throws Exception { + try { + return new RestClusterClient<>(configuration, StandaloneClusterId.getInstance()); + } catch (Exception e) { + log.error("get flink client failed: ", e); + throw new Exception("get flink client failed: " + e.getMessage()); + } + } + + /** + * Get the job status by the given job id. + */ + public JobStatus getJobStatus(String jobId) throws Exception { + try { + JobID jobID = JobID.fromHexString(jobId); + CompletableFuture jobStatus = flinkClient.getJobStatus(jobID); + return jobStatus.get(); + } catch (Exception e) { + log.error("get job status by jobId={} failed: ", jobId, e); + throw new Exception("get job status by jobId=" + jobId + " failed: " + e.getMessage()); + } + } + + /** + * Get job detail by the given job id. + */ + public JobDetailsInfo getJobDetail(String jobId) throws Exception { + try { + JobID jobID = JobID.fromHexString(jobId); + CompletableFuture jobDetails = flinkClient.getJobDetails(jobID); + return jobDetails.get(); + } catch (Exception e) { + log.error("get job detail by jobId={} failed: ", jobId, e); + throw new Exception("get job detail by jobId=" + jobId + " failed: " + e.getMessage()); + } + } + + /** + * Stop the Flink job with the savepoint. + */ + public String stopJob(String jobId, boolean isDrain, String savepointDirectory) throws Exception { + try { + JobID jobID = JobID.fromHexString(jobId); + CompletableFuture stopResult = flinkClient.stopWithSavepoint(jobID, isDrain, savepointDirectory, + SavepointFormatType.CANONICAL); + return stopResult.get(); + } catch (Exception e) { + log.error("stop job {} failed and savepoint directory is {} : ", jobId, savepointDirectory, e); + throw new Exception("stop job " + jobId + " failed: " + e.getMessage()); + } + } + + /** + * Cancel the Flink job. + */ + public void cancelJob(String jobId) throws Exception { + try { + JobID jobID = JobID.fromHexString(jobId); + flinkClient.cancel(jobID); + } catch (Exception e) { + log.error("cancel job {} failed: ", jobId, e); + throw new Exception("cancel job " + jobId + " failed: " + e.getMessage()); + } + } +} diff --git a/inlong-manager/manager-plugins/pom.xml b/inlong-manager/manager-plugins/pom.xml index 6d3e1cd764b..4bb92e96e08 100644 --- a/inlong-manager/manager-plugins/pom.xml +++ b/inlong-manager/manager-plugins/pom.xml @@ -29,8 +29,6 @@ Apache InLong - Manager Plugins - manager-plugins-flink-v1.13 - manager-plugins-flink-v1.15 base @@ -38,4 +36,35 @@ ${project.parent.parent.basedir} + + + flink-all-version + + true + + + manager-plugins-flink-v1.13 + manager-plugins-flink-v1.15 + manager-plugins-flink-v1.18 + + + + v1.13 + + manager-plugins-flink-v1.13 + + + + v1.15 + + manager-plugins-flink-v1.15 + + + + v1.18 + + manager-plugins-flink-v1.18 + + +