Skip to content

Commit

Permalink
[FLINK-35109] Bump to Flink 1.19 and support Flink 1.20
Browse files Browse the repository at this point in the history
Also
- adds the migration support tests up to 1.20.
- bumps Kafka-client to 3.6.2
  • Loading branch information
AHeise committed Oct 10, 2024
1 parent 429fe0c commit aedc790
Show file tree
Hide file tree
Showing 41 changed files with 293 additions and 54 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ jobs:
compile_and_test:
strategy:
matrix:
flink: [ 1.18.1 ]
jdk: [ '8, 11, 17' ]
flink: [ 1.19.1 ]
jdk: [ '8, 11, 17, 21' ]
include:
- flink: 1.19.0
- flink: 1.20.0
jdk: '8, 11, 17, 21'
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
Expand All @@ -40,7 +40,7 @@ jobs:
python_test:
strategy:
matrix:
flink: [ 1.18.1, 1.19.0 ]
flink: [ 1.19.1, 1.20.0 ]
uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink }}
21 changes: 4 additions & 17 deletions .github/workflows/weekly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,21 @@ jobs:
strategy:
matrix:
flink_branches: [{
flink: 1.18-SNAPSHOT,
branch: main
}, {
flink: 1.19-SNAPSHOT,
jdk: '8, 11, 17, 21',
branch: main
}, {
flink: 1.20-SNAPSHOT,
jdk: '8, 11, 17, 21',
branch: main
}, {
flink: 1.18.1,
flink: 1.19.1,
branch: v3.2
}, {
flink: 1.19.0,
branch: v3.2,
jdk: '8, 11, 17, 21',
}, {
flink: 1.18.1,
branch: v3.1
}, {
flink: 1.19.0,
branch: v3.1,
jdk: '8, 11, 17, 21',
flink: 1.20.0,
branch: v3.2
}]
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink_branches.flink }}
connector_branch: ${{ matrix.flink_branches.branch }}
jdk_version: ${{ matrix.flink_branches.jdk || '8, 11, 17' }}
jdk_version: ${{ matrix.flink_branches.jdk || '8, 11, 17, 21' }}
run_dependency_convergence: false
12 changes: 7 additions & 5 deletions flink-connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ under the License.
FlinkKafkaProducerBaseTest --> --add-opens=java.base/java.lang=ALL-UNNAMED <!--
FlinkKafkaProducerBaseTest --> --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED <!--
FlinkKafkaConsumerBaseTest --> --add-opens=java.base/java.util=ALL-UNNAMED <!--
KafkaProducerExactlyOnceITCase --> --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED </flink.connector.module.config>
KafkaProducerExactlyOnceITCase --> --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
</flink.connector.module.config>
</properties>

<dependencies>
Expand Down Expand Up @@ -81,10 +82,10 @@ under the License.
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<!-- Tests -->

Expand Down Expand Up @@ -211,6 +212,7 @@ under the License.
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {

@Parameterized.Parameters(name = "Migration Savepoint: {0}")
public static Collection<FlinkVersion> parameters() {
return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_16);
return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.current());
}

public FlinkKafkaConsumerBaseMigrationTest(FlinkVersion testMigrateVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
public class FlinkKafkaProducerMigrationTest extends KafkaMigrationTestBase {
@Parameterized.Parameters(name = "Migration Savepoint: {0}")
public static Collection<FlinkVersion> parameters() {
return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_16);
return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.current());
}

public FlinkKafkaProducerMigrationTest(FlinkVersion testMigrateVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerMatchers;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer;
import org.apache.flink.streaming.connectors.kafka.testutils.TypeSerializerMatchers;
import org.apache.flink.streaming.connectors.kafka.testutils.TypeSerializerUpgradeTestBase;

import org.hamcrest.Matcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.Before;
Expand Down Expand Up @@ -65,7 +64,6 @@ public void testKafkaDebeziumChangelogSource() throws Exception {
tableConf.set(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1));
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");

// ---------- Write the Debezium json into Kafka -------------------
List<String> lines = readLines("debezium-data-schema-exclude.txt");
Expand Down Expand Up @@ -194,7 +192,6 @@ public void testKafkaCanalChangelogSource() throws Exception {
tableConf.set(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1));
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");

// ---------- Write the Canal json into Kafka -------------------
List<String> lines = readLines("canal-data.txt");
Expand Down Expand Up @@ -335,7 +332,6 @@ public void testKafkaMaxwellChangelogSource() throws Exception {
tableConf.set(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1));
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");

// ---------- Write the Maxwell json into Kafka -------------------
List<String> lines = readLines("maxwell-data.txt");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kafka.testutils;

import java.io.Closeable;

/**
* Utility class to temporarily use a different classloader as the thread context classloader.
*
* <p>Temporarily copied from flink-core to avoid dependency on flink-core.
*/
public class ThreadContextClassLoader implements Closeable {

private final ClassLoader originalThreadContextClassLoader;

public ThreadContextClassLoader(ClassLoader newThreadContextClassLoader) {
this.originalThreadContextClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(newThreadContextClassLoader);
}

@Override
public void close() {
Thread.currentThread().setContextClassLoader(originalThreadContextClassLoader);
}
}
Loading

0 comments on commit aedc790

Please sign in to comment.