Skip to content

Commit

Permalink
[INLONG-9402][Sort] Add version class to compatible.
Browse files Browse the repository at this point in the history
  • Loading branch information
XiaoYou201 committed Apr 8, 2024
1 parent 2c68864 commit 881a09d
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.inlong.sort.tests;

import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
import org.apache.inlong.sort.tests.utils.JdbcProxy;
import org.apache.inlong.sort.tests.utils.MySqlContainer;
import org.apache.inlong.sort.tests.utils.PlaceholderResolver;
Expand Down Expand Up @@ -62,7 +62,7 @@
/**
* End-to-end tests for sort-connector-kafka uber jar.
*/
public class Kafka2StarRocksTest extends FlinkContainerTestEnv {
public class Kafka2StarRocksTest extends FlinkContainerTestEnvJRE8 {

private static final Logger LOG = LoggerFactory.getLogger(Kafka2StarRocksTest.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.inlong.sort.tests;

import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
import org.apache.inlong.sort.tests.utils.JdbcProxy;
import org.apache.inlong.sort.tests.utils.StarRocksContainer;
import org.apache.inlong.sort.tests.utils.TestUtils;
Expand Down Expand Up @@ -60,7 +60,7 @@
* End-to-end tests for sort-connector-mongodb-cdc-v1.15 uber jar.
* Test flink sql Mongodb cdc to StarRocks
*/
public class Mongodb2StarRocksTest extends FlinkContainerTestEnv {
public class Mongodb2StarRocksTest extends FlinkContainerTestEnvJRE8 {

private static final Logger LOG = LoggerFactory.getLogger(Mongodb2StarRocksTest.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.inlong.sort.tests;

import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
import org.apache.inlong.sort.tests.utils.JdbcProxy;
import org.apache.inlong.sort.tests.utils.MySqlContainer;
import org.apache.inlong.sort.tests.utils.StarRocksContainer;
Expand Down Expand Up @@ -51,7 +51,7 @@
* End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
* Test flink sql Mysql cdc to StarRocks
*/
public class Mysql2StarRocksTest extends FlinkContainerTestEnv {
public class Mysql2StarRocksTest extends FlinkContainerTestEnvJRE8 {

private static final Logger LOG = LoggerFactory.getLogger(Mysql2StarRocksTest.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.inlong.sort.tests;

import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE11;
import org.apache.inlong.sort.tests.utils.JdbcProxy;
import org.apache.inlong.sort.tests.utils.StarRocksContainer;
import org.apache.inlong.sort.tests.utils.TestUtils;
Expand Down Expand Up @@ -49,7 +49,7 @@
* End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
* Test flink sql Postgres cdc to StarRocks
*/
public class Postgres2StarRocksTest extends FlinkContainerTestEnv {
public class Postgres2StarRocksTest extends FlinkContainerTestEnvJRE11 {

private static final Logger PG_LOG = LoggerFactory.getLogger(PostgreSQLContainer.class);
private static final Logger LOG = LoggerFactory.getLogger(Postgres2StarRocksTest.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.inlong.sort.tests;

import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
import org.apache.inlong.sort.tests.utils.RedisContainer;
import org.apache.inlong.sort.tests.utils.TestUtils;

Expand All @@ -40,7 +40,7 @@
import static org.junit.Assert.assertEquals;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

public class RedisToRedisTest extends FlinkContainerTestEnv {
public class RedisToRedisTest extends FlinkContainerTestEnvJRE8 {

private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class);
private static final Path redisJar = TestUtils.getResource("sort-connector-redis.jar");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.inlong.sort.tests;

import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
import org.apache.inlong.sort.tests.utils.JdbcProxy;
import org.apache.inlong.sort.tests.utils.MSSQLServerContainer;
import org.apache.inlong.sort.tests.utils.StarRocksContainer;
Expand Down Expand Up @@ -48,7 +48,7 @@
import static org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;

public class Sqlserver2StarRocksTest extends FlinkContainerTestEnv {
public class Sqlserver2StarRocksTest extends FlinkContainerTestEnvJRE8 {

private static final Logger LOG = LoggerFactory.getLogger(Sqlserver2StarRocksTest.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
Expand All @@ -39,9 +38,7 @@
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.lifecycle.Startables;

import javax.annotation.Nullable;

Expand All @@ -62,7 +59,6 @@
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.jar.JarOutputStream;
import java.util.stream.Stream;

import static org.apache.flink.util.Preconditions.checkState;

Expand All @@ -72,20 +68,20 @@
*/
public abstract class FlinkContainerTestEnv extends TestLogger {

private static final Logger JM_LOG = LoggerFactory.getLogger(JobMaster.class);
private static final Logger TM_LOG = LoggerFactory.getLogger(TaskExecutor.class);
private static final Logger LOG = LoggerFactory.getLogger(FlinkContainerTestEnv.class);
static final Logger JM_LOG = LoggerFactory.getLogger(JobMaster.class);
static final Logger TM_LOG = LoggerFactory.getLogger(TaskExecutor.class);
static final Logger LOG = LoggerFactory.getLogger(FlinkContainerTestEnv.class);

private static final Path SORT_DIST_JAR = TestUtils.getResource("sort-dist.jar");
// ------------------------------------------------------------------------------------------
// Flink Variables
// ------------------------------------------------------------------------------------------
private static final int JOB_MANAGER_REST_PORT = 8081;
private static final int DEBUG_PORT = 20000;
private static final String FLINK_BIN = "bin";
private static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
private static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
private static final String FLINK_PROPERTIES = String.join("\n", Arrays.asList(
static final int JOB_MANAGER_REST_PORT = 8081;
static final int DEBUG_PORT = 20000;
static final String FLINK_BIN = "bin";
static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
static final String FLINK_PROPERTIES = String.join("\n", Arrays.asList(
"jobmanager.rpc.address: jobmanager",
"taskmanager.numberOfTaskSlots: 10",
"parallelism.default: 4",
Expand All @@ -104,35 +100,8 @@ public abstract class FlinkContainerTestEnv extends TestLogger {
@Nullable
private static RestClusterClient<StandaloneClusterId> restClusterClient;

private static GenericContainer<?> jobManager;
private static GenericContainer<?> taskManager;

@BeforeClass
public static void before() {
LOG.info("Starting containers...");
jobManager =
new GenericContainer<>("flink:1.15.4-scala_2.12-java8")
.withCommand("jobmanager")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT, DEBUG_PORT)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.withExposedPorts(JOB_MANAGER_REST_PORT)
.withLogConsumer(new Slf4jLogConsumer(JM_LOG));
taskManager =
new GenericContainer<>("flink:1.15.4-scala_2.12-java8")
.withCommand("taskmanager")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withExposedPorts(DEBUG_PORT)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.dependsOn(jobManager)
.withLogConsumer(new Slf4jLogConsumer(TM_LOG));

Startables.deepStart(Stream.of(jobManager)).join();
Startables.deepStart(Stream.of(taskManager)).join();
LOG.info("Containers are started.");
}
static GenericContainer<?> jobManager;
static GenericContainer<?> taskManager;

@AfterClass
public static void after() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.tests.utils;

import org.junit.BeforeClass;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

import java.util.stream.Stream;

public abstract class FlinkContainerTestEnvJRE11 extends FlinkContainerTestEnv {

@BeforeClass
public static void before() {
LOG.info("Starting containers...");
jobManager =
new GenericContainer<>("flink:1.15.4-scala_2.12")
.withCommand("jobmanager")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT, DEBUG_PORT)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.withExposedPorts(JOB_MANAGER_REST_PORT)
.withLogConsumer(new Slf4jLogConsumer(JM_LOG));
taskManager =
new GenericContainer<>("flink:1.15.4-scala_2.12")
.withCommand("taskmanager")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withExposedPorts(DEBUG_PORT)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.dependsOn(jobManager)
.withLogConsumer(new Slf4jLogConsumer(TM_LOG));

Startables.deepStart(Stream.of(jobManager)).join();
Startables.deepStart(Stream.of(taskManager)).join();
LOG.info("Containers are started.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.tests.utils;

import org.junit.BeforeClass;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

import java.util.stream.Stream;

public abstract class FlinkContainerTestEnvJRE8 extends FlinkContainerTestEnv {

@BeforeClass
public static void before() {
LOG.info("Starting containers...");
jobManager =
new GenericContainer<>("flink:1.15.4-scala_2.12-java8")
.withCommand("jobmanager")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT, DEBUG_PORT)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.withExposedPorts(JOB_MANAGER_REST_PORT)
.withLogConsumer(new Slf4jLogConsumer(JM_LOG));
taskManager =
new GenericContainer<>("flink:1.15.4-scala_2.12-java8")
.withCommand("taskmanager")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withExposedPorts(DEBUG_PORT)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.dependsOn(jobManager)
.withLogConsumer(new Slf4jLogConsumer(TM_LOG));

Startables.deepStart(Stream.of(jobManager)).join();
Startables.deepStart(Stream.of(taskManager)).join();
LOG.info("Containers are started.");
}
}

0 comments on commit 881a09d

Please sign in to comment.