From 881a09d52be4b328e9727fc473305e80755e7cfc Mon Sep 17 00:00:00 2001 From: XiaoYou201 Date: Tue, 9 Apr 2024 05:16:27 +0800 Subject: [PATCH] [INLONG-9402][Sort] Add version class to compatible. --- .../sort/tests/Kafka2StarRocksTest.java | 4 +- .../sort/tests/Mongodb2StarRocksTest.java | 4 +- .../sort/tests/Mysql2StarRocksTest.java | 4 +- .../sort/tests/Postgres2StarRocksTest.java | 4 +- .../inlong/sort/tests/RedisToRedisTest.java | 4 +- .../sort/tests/Sqlserver2StarRocksTest.java | 4 +- .../tests/utils/FlinkContainerTestEnv.java | 53 ++++-------------- .../utils/FlinkContainerTestEnvJRE11.java | 55 +++++++++++++++++++ .../utils/FlinkContainerTestEnvJRE8.java | 55 +++++++++++++++++++ 9 files changed, 133 insertions(+), 54 deletions(-) create mode 100644 inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE11.java create mode 100644 inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java index 9698711825b..70e6b2413ed 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java @@ -17,7 +17,7 @@ package org.apache.inlong.sort.tests; -import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv; +import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8; import org.apache.inlong.sort.tests.utils.JdbcProxy; import org.apache.inlong.sort.tests.utils.MySqlContainer; import org.apache.inlong.sort.tests.utils.PlaceholderResolver; @@ -62,7 +62,7 @@ /** * End-to-end tests for sort-connector-kafka uber jar. */ -public class Kafka2StarRocksTest extends FlinkContainerTestEnv { +public class Kafka2StarRocksTest extends FlinkContainerTestEnvJRE8 { private static final Logger LOG = LoggerFactory.getLogger(Kafka2StarRocksTest.class); diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mongodb2StarRocksTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mongodb2StarRocksTest.java index ef3a1dbe29e..93a8956a7a1 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mongodb2StarRocksTest.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mongodb2StarRocksTest.java @@ -17,7 +17,7 @@ package org.apache.inlong.sort.tests; -import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv; +import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8; import org.apache.inlong.sort.tests.utils.JdbcProxy; import org.apache.inlong.sort.tests.utils.StarRocksContainer; import org.apache.inlong.sort.tests.utils.TestUtils; @@ -60,7 +60,7 @@ * End-to-end tests for sort-connector-mongodb-cdc-v1.15 uber jar. * Test flink sql Mongodb cdc to StarRocks */ -public class Mongodb2StarRocksTest extends FlinkContainerTestEnv { +public class Mongodb2StarRocksTest extends FlinkContainerTestEnvJRE8 { private static final Logger LOG = LoggerFactory.getLogger(Mongodb2StarRocksTest.class); diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java index 06b7034fae5..6b7a5aa644e 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java @@ -17,7 +17,7 @@ package org.apache.inlong.sort.tests; -import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv; +import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8; import org.apache.inlong.sort.tests.utils.JdbcProxy; import org.apache.inlong.sort.tests.utils.MySqlContainer; import org.apache.inlong.sort.tests.utils.StarRocksContainer; @@ -51,7 +51,7 @@ * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar. * Test flink sql Mysql cdc to StarRocks */ -public class Mysql2StarRocksTest extends FlinkContainerTestEnv { +public class Mysql2StarRocksTest extends FlinkContainerTestEnvJRE8 { private static final Logger LOG = LoggerFactory.getLogger(Mysql2StarRocksTest.class); diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Postgres2StarRocksTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Postgres2StarRocksTest.java index 3f1d4f45658..95a2a2e5aa7 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Postgres2StarRocksTest.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Postgres2StarRocksTest.java @@ -17,7 +17,7 @@ package org.apache.inlong.sort.tests; -import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv; +import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE11; import org.apache.inlong.sort.tests.utils.JdbcProxy; import org.apache.inlong.sort.tests.utils.StarRocksContainer; import org.apache.inlong.sort.tests.utils.TestUtils; @@ -49,7 +49,7 @@ * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar. * Test flink sql Postgres cdc to StarRocks */ -public class Postgres2StarRocksTest extends FlinkContainerTestEnv { +public class Postgres2StarRocksTest extends FlinkContainerTestEnvJRE11 { private static final Logger PG_LOG = LoggerFactory.getLogger(PostgreSQLContainer.class); private static final Logger LOG = LoggerFactory.getLogger(Postgres2StarRocksTest.class); diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/RedisToRedisTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/RedisToRedisTest.java index 2332f2dcb84..912a37f5b3e 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/RedisToRedisTest.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/RedisToRedisTest.java @@ -17,7 +17,7 @@ package org.apache.inlong.sort.tests; -import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv; +import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8; import org.apache.inlong.sort.tests.utils.RedisContainer; import org.apache.inlong.sort.tests.utils.TestUtils; @@ -40,7 +40,7 @@ import static org.junit.Assert.assertEquals; import static org.testcontainers.shaded.org.awaitility.Awaitility.await; -public class RedisToRedisTest extends FlinkContainerTestEnv { +public class RedisToRedisTest extends FlinkContainerTestEnvJRE8 { private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class); private static final Path redisJar = TestUtils.getResource("sort-connector-redis.jar"); diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Sqlserver2StarRocksTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Sqlserver2StarRocksTest.java index 2204c8b9e00..1d983f9341a 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Sqlserver2StarRocksTest.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Sqlserver2StarRocksTest.java @@ -17,7 +17,7 @@ package org.apache.inlong.sort.tests; -import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv; +import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8; import org.apache.inlong.sort.tests.utils.JdbcProxy; import org.apache.inlong.sort.tests.utils.MSSQLServerContainer; import org.apache.inlong.sort.tests.utils.StarRocksContainer; @@ -48,7 +48,7 @@ import static org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName; import static org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable; -public class Sqlserver2StarRocksTest extends FlinkContainerTestEnv { +public class Sqlserver2StarRocksTest extends FlinkContainerTestEnvJRE8 { private static final Logger LOG = LoggerFactory.getLogger(Sqlserver2StarRocksTest.class); diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java index ca1319ad78d..de6166442ea 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java @@ -30,7 +30,6 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.rules.TemporaryFolder; @@ -39,9 +38,7 @@ import org.testcontainers.containers.Container.ExecResult; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.images.builder.Transferable; -import org.testcontainers.lifecycle.Startables; import javax.annotation.Nullable; @@ -62,7 +59,6 @@ import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.jar.JarOutputStream; -import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkState; @@ -72,20 +68,20 @@ */ public abstract class FlinkContainerTestEnv extends TestLogger { - private static final Logger JM_LOG = LoggerFactory.getLogger(JobMaster.class); - private static final Logger TM_LOG = LoggerFactory.getLogger(TaskExecutor.class); - private static final Logger LOG = LoggerFactory.getLogger(FlinkContainerTestEnv.class); + static final Logger JM_LOG = LoggerFactory.getLogger(JobMaster.class); + static final Logger TM_LOG = LoggerFactory.getLogger(TaskExecutor.class); + static final Logger LOG = LoggerFactory.getLogger(FlinkContainerTestEnv.class); private static final Path SORT_DIST_JAR = TestUtils.getResource("sort-dist.jar"); // ------------------------------------------------------------------------------------------ // Flink Variables // ------------------------------------------------------------------------------------------ - private static final int JOB_MANAGER_REST_PORT = 8081; - private static final int DEBUG_PORT = 20000; - private static final String FLINK_BIN = "bin"; - private static final String INTER_CONTAINER_JM_ALIAS = "jobmanager"; - private static final String INTER_CONTAINER_TM_ALIAS = "taskmanager"; - private static final String FLINK_PROPERTIES = String.join("\n", Arrays.asList( + static final int JOB_MANAGER_REST_PORT = 8081; + static final int DEBUG_PORT = 20000; + static final String FLINK_BIN = "bin"; + static final String INTER_CONTAINER_JM_ALIAS = "jobmanager"; + static final String INTER_CONTAINER_TM_ALIAS = "taskmanager"; + static final String FLINK_PROPERTIES = String.join("\n", Arrays.asList( "jobmanager.rpc.address: jobmanager", "taskmanager.numberOfTaskSlots: 10", "parallelism.default: 4", @@ -104,35 +100,8 @@ public abstract class FlinkContainerTestEnv extends TestLogger { @Nullable private static RestClusterClient restClusterClient; - private static GenericContainer jobManager; - private static GenericContainer taskManager; - - @BeforeClass - public static void before() { - LOG.info("Starting containers..."); - jobManager = - new GenericContainer<>("flink:1.15.4-scala_2.12-java8") - .withCommand("jobmanager") - .withNetwork(NETWORK) - .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) - .withExposedPorts(JOB_MANAGER_REST_PORT, DEBUG_PORT) - .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) - .withExposedPorts(JOB_MANAGER_REST_PORT) - .withLogConsumer(new Slf4jLogConsumer(JM_LOG)); - taskManager = - new GenericContainer<>("flink:1.15.4-scala_2.12-java8") - .withCommand("taskmanager") - .withNetwork(NETWORK) - .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) - .withExposedPorts(DEBUG_PORT) - .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) - .dependsOn(jobManager) - .withLogConsumer(new Slf4jLogConsumer(TM_LOG)); - - Startables.deepStart(Stream.of(jobManager)).join(); - Startables.deepStart(Stream.of(taskManager)).join(); - LOG.info("Containers are started."); - } + static GenericContainer jobManager; + static GenericContainer taskManager; @AfterClass public static void after() { diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE11.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE11.java new file mode 100644 index 00000000000..f12916e3526 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE11.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests.utils; + +import org.junit.BeforeClass; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.util.stream.Stream; + +public abstract class FlinkContainerTestEnvJRE11 extends FlinkContainerTestEnv { + + @BeforeClass + public static void before() { + LOG.info("Starting containers..."); + jobManager = + new GenericContainer<>("flink:1.15.4-scala_2.12") + .withCommand("jobmanager") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) + .withExposedPorts(JOB_MANAGER_REST_PORT, DEBUG_PORT) + .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .withExposedPorts(JOB_MANAGER_REST_PORT) + .withLogConsumer(new Slf4jLogConsumer(JM_LOG)); + taskManager = + new GenericContainer<>("flink:1.15.4-scala_2.12") + .withCommand("taskmanager") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) + .withExposedPorts(DEBUG_PORT) + .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .dependsOn(jobManager) + .withLogConsumer(new Slf4jLogConsumer(TM_LOG)); + + Startables.deepStart(Stream.of(jobManager)).join(); + Startables.deepStart(Stream.of(taskManager)).join(); + LOG.info("Containers are started."); + } +} diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java new file mode 100644 index 00000000000..a59d9c9e982 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests.utils; + +import org.junit.BeforeClass; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.util.stream.Stream; + +public abstract class FlinkContainerTestEnvJRE8 extends FlinkContainerTestEnv { + + @BeforeClass + public static void before() { + LOG.info("Starting containers..."); + jobManager = + new GenericContainer<>("flink:1.15.4-scala_2.12-java8") + .withCommand("jobmanager") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) + .withExposedPorts(JOB_MANAGER_REST_PORT, DEBUG_PORT) + .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .withExposedPorts(JOB_MANAGER_REST_PORT) + .withLogConsumer(new Slf4jLogConsumer(JM_LOG)); + taskManager = + new GenericContainer<>("flink:1.15.4-scala_2.12-java8") + .withCommand("taskmanager") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) + .withExposedPorts(DEBUG_PORT) + .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .dependsOn(jobManager) + .withLogConsumer(new Slf4jLogConsumer(TM_LOG)); + + Startables.deepStart(Stream.of(jobManager)).join(); + Startables.deepStart(Stream.of(taskManager)).join(); + LOG.info("Containers are started."); + } +}