Skip to content

Commit

Permalink
[INLONG-10056][Manager] Support new manager plugin for flink 1.18 (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
aloyszhang authored Apr 26, 2024
1 parent 721d61a commit b0357fa
Show file tree
Hide file tree
Showing 5 changed files with 295 additions and 13 deletions.
47 changes: 36 additions & 11 deletions inlong-manager/manager-plugins/base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,6 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>manager-plugins-flink-v1.13</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>manager-plugins-flink-v1.15</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand Down Expand Up @@ -116,4 +105,40 @@
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>v1.13</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>manager-plugins-flink-v1.13</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>v1.15</id>
<dependencies>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>manager-plugins-flink-v1.15</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>v1.18</id>
<dependencies>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>manager-plugins-flink-v1.18</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
21 changes: 21 additions & 0 deletions inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@
</includes>
</fileSet>

<!-- Plugins Flink v1.18 -->
<fileSet>
<directory>../manager-plugins-flink-v1.18/target</directory>
<outputDirectory>./</outputDirectory>
<includes>
<include>manager-plugins-flink-v1.18.jar</include>
</includes>
</fileSet>

<!-- Flink v1.13 dependencies -->
<fileSet>
<directory>../manager-plugins-flink-v1.13/target</directory>
Expand All @@ -75,5 +84,17 @@
<include>manager-plugins-flink-v1.15.jar</include>
</includes>
</fileSet>

<!-- Flink v1.18 dependencies -->
<fileSet>
<directory>../manager-plugins-flink-v1.18/target</directory>
<outputDirectory>./flink-v1.18</outputDirectory>
<includes>
<include>flink-*.jar</include>
<include>sort-flink-*.jar</include>
<include>scala-*.jar</include>
<include>manager-plugins-flink-v1.18.jar</include>
</includes>
</fileSet>
</fileSets>
</assembly>
95 changes: 95 additions & 0 deletions inlong-manager/manager-plugins/manager-plugins-flink-v1.18/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.inlong</groupId>
<artifactId>manager-plugins</artifactId>
<version>1.13.0-SNAPSHOT</version>
</parent>

<artifactId>manager-plugins-flink-v1.18</artifactId>
<name>Apache InLong - Manager Plugins Flink v1.18</name>

<properties>
<inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir>
<flink.version>1.18.1</flink.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-flink-dependencies-v1.18</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-file-sink-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
<version>2.15.3-18.0</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

<build>
<finalName>manager-plugins-flink-v1.18</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<goals>
<goal>copy-dependencies</goal>
</goals>
<phase>package</phase>
<configuration>
<outputDirectory>target/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.plugin.flink;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;

import java.util.concurrent.CompletableFuture;

/**
* Flink service, such as save or get flink config info, etc.
*/
@Slf4j
public class FlinkClientService {

private final Configuration configuration;
private final RestClusterClient<StandaloneClusterId> flinkClient;

public FlinkClientService(Configuration configuration) throws Exception {
this.configuration = configuration;
this.flinkClient = getFlinkClient();
}

/**
* Get the Flink Client.
*/
public RestClusterClient<StandaloneClusterId> getFlinkClient() throws Exception {
try {
return new RestClusterClient<>(configuration, StandaloneClusterId.getInstance());
} catch (Exception e) {
log.error("get flink client failed: ", e);
throw new Exception("get flink client failed: " + e.getMessage());
}
}

/**
* Get the job status by the given job id.
*/
public JobStatus getJobStatus(String jobId) throws Exception {
try {
JobID jobID = JobID.fromHexString(jobId);
CompletableFuture<JobStatus> jobStatus = flinkClient.getJobStatus(jobID);
return jobStatus.get();
} catch (Exception e) {
log.error("get job status by jobId={} failed: ", jobId, e);
throw new Exception("get job status by jobId=" + jobId + " failed: " + e.getMessage());
}
}

/**
* Get job detail by the given job id.
*/
public JobDetailsInfo getJobDetail(String jobId) throws Exception {
try {
JobID jobID = JobID.fromHexString(jobId);
CompletableFuture<JobDetailsInfo> jobDetails = flinkClient.getJobDetails(jobID);
return jobDetails.get();
} catch (Exception e) {
log.error("get job detail by jobId={} failed: ", jobId, e);
throw new Exception("get job detail by jobId=" + jobId + " failed: " + e.getMessage());
}
}

/**
* Stop the Flink job with the savepoint.
*/
public String stopJob(String jobId, boolean isDrain, String savepointDirectory) throws Exception {
try {
JobID jobID = JobID.fromHexString(jobId);
CompletableFuture<String> stopResult = flinkClient.stopWithSavepoint(jobID, isDrain, savepointDirectory,
SavepointFormatType.CANONICAL);
return stopResult.get();
} catch (Exception e) {
log.error("stop job {} failed and savepoint directory is {} : ", jobId, savepointDirectory, e);
throw new Exception("stop job " + jobId + " failed: " + e.getMessage());
}
}

/**
* Cancel the Flink job.
*/
public void cancelJob(String jobId) throws Exception {
try {
JobID jobID = JobID.fromHexString(jobId);
flinkClient.cancel(jobID);
} catch (Exception e) {
log.error("cancel job {} failed: ", jobId, e);
throw new Exception("cancel job " + jobId + " failed: " + e.getMessage());
}
}
}
33 changes: 31 additions & 2 deletions inlong-manager/manager-plugins/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,42 @@
<name>Apache InLong - Manager Plugins</name>

<modules>
<module>manager-plugins-flink-v1.13</module>
<module>manager-plugins-flink-v1.15</module>
<module>base</module>
</modules>

<properties>
<inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
</properties>

<profiles>
<profile>
<id>flink-all-version</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<modules>
<module>manager-plugins-flink-v1.13</module>
<module>manager-plugins-flink-v1.15</module>
<module>manager-plugins-flink-v1.18</module>
</modules>
</profile>
<profile>
<id>v1.13</id>
<modules>
<module>manager-plugins-flink-v1.13</module>
</modules>
</profile>
<profile>
<id>v1.15</id>
<modules>
<module>manager-plugins-flink-v1.15</module>
</modules>
</profile>
<profile>
<id>v1.18</id>
<modules>
<module>manager-plugins-flink-v1.18</module>
</modules>
</profile>
</profiles>
</project>

0 comments on commit b0357fa

Please sign in to comment.