Skip to content

Commit

Permalink
[HUDI-6993] Support Flink 1.18 (#9949)
Browse files Browse the repository at this point in the history
* Address build failures in older Flink Versions
* Remove unnecessary dependency on flink-connector-hive
* Fix Flink 1.18 Validate-bundles

---------

Signed-off-by: Prabhu Joseph <[email protected]>
Co-authored-by: Prabhu Joseph <[email protected]>
Co-authored-by: root <[email protected]>
  • Loading branch information
3 people authored and HuangZhenQiu committed Aug 15, 2024
1 parent 4d32fe5 commit edab9a6
Show file tree
Hide file tree
Showing 53 changed files with 4,812 additions and 39 deletions.
12 changes: 8 additions & 4 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ jobs:
include:
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.2"
flinkProfile: "flink1.17"
flinkProfile: "flink1.18"

steps:
- uses: actions/checkout@v3
Expand Down Expand Up @@ -210,6 +210,7 @@ jobs:
- flinkProfile: "flink1.15"
- flinkProfile: "flink1.16"
- flinkProfile: "flink1.17"
- flinkProfile: "flink1.18"
steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
Expand All @@ -234,7 +235,7 @@ jobs:
env:
SCALA_PROFILE: 'scala-2.12'
FLINK_PROFILE: ${{ matrix.flinkProfile }}
if: ${{ endsWith(env.FLINK_PROFILE, '1.17') }}
if: ${{ endsWith(env.FLINK_PROFILE, '1.18') }}
run: |
mvn clean install -Pintegration-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl hudi-flink-datasource/hudi-flink -am -Davro.version=1.10.0 -DskipTests=true $MVN_ARGS
mvn verify -Pintegration-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl hudi-flink-datasource/hudi-flink $MVN_ARGS
Expand All @@ -244,7 +245,7 @@ jobs:
strategy:
matrix:
include:
- flinkProfile: 'flink1.17'
- flinkProfile: 'flink1.18'
sparkProfile: 'spark3.4'
sparkRuntime: 'spark3.4.0'

Expand Down Expand Up @@ -272,9 +273,12 @@ jobs:
strategy:
matrix:
include:
- flinkProfile: 'flink1.17'
- flinkProfile: 'flink1.18'
sparkProfile: 'spark3.4'
sparkRuntime: 'spark3.4.0'
- flinkProfile: 'flink1.18'
sparkProfile: 'spark3.3'
sparkRuntime: 'spark3.3.2'
- flinkProfile: 'flink1.17'
sparkProfile: 'spark3.3'
sparkRuntime: 'spark3.3.2'
Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,15 @@ Starting from versions 0.11, Hudi no longer requires `spark-avro` to be specifie

### Build with different Flink versions

The default Flink version supported is 1.17. The default Flink 1.17.x version, corresponding to `flink1.17` profile is 1.17.0.
The default Flink version supported is 1.18. The default Flink 1.18.x version, corresponding to `flink1.18` profile is 1.18.0.
Flink is Scala-free since 1.15.x, there is no need to specify the Scala version for Flink 1.15.x and above versions.
Refer to the table below for building with different Flink and Scala versions.

| Maven build options | Expected Flink bundle jar name | Notes |
|:---------------------------|:-------------------------------|:---------------------------------|
| (empty) | hudi-flink1.17-bundle | For Flink 1.17 (default options) |
| `-Dflink1.17` | hudi-flink1.17-bundle | For Flink 1.17 (same as default) |
| (empty) | hudi-flink1.18-bundle | For Flink 1.18 (default options) |
| `-Dflink1.18` | hudi-flink1.18-bundle | For Flink 1.18 (same as default) |
| `-Dflink1.17` | hudi-flink1.17-bundle | For Flink 1.17 |
| `-Dflink1.16` | hudi-flink1.16-bundle | For Flink 1.16 |
| `-Dflink1.15` | hudi-flink1.15-bundle | For Flink 1.15 |
| `-Dflink1.14` | hudi-flink1.14-bundle | For Flink 1.14 and Scala 2.12 |
Expand Down
7 changes: 5 additions & 2 deletions azure-pipelines-20230430.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

# NOTE:
# This config file defines how Azure CI runs tests with Spark 2.4 and Flink 1.17 profiles.
# This config file defines how Azure CI runs tests with Spark 2.4 and Flink 1.18 profiles.
# PRs will need to keep in sync with master's version to trigger the CI runs.

trigger:
Expand All @@ -37,6 +37,7 @@ parameters:
- 'hudi-flink-datasource/hudi-flink1.15.x'
- 'hudi-flink-datasource/hudi-flink1.16.x'
- 'hudi-flink-datasource/hudi-flink1.17.x'
- 'hudi-flink-datasource/hudi-flink1.18.x'
- name: job2Modules
type: object
default:
Expand Down Expand Up @@ -69,6 +70,7 @@ parameters:
- '!hudi-flink-datasource/hudi-flink1.15.x'
- '!hudi-flink-datasource/hudi-flink1.16.x'
- '!hudi-flink-datasource/hudi-flink1.17.x'
- '!hudi-flink-datasource/hudi-flink1.18.x'
- '!hudi-spark-datasource'
- '!hudi-spark-datasource/hudi-spark'
- '!hudi-spark-datasource/hudi-spark3.2.x'
Expand All @@ -92,9 +94,10 @@ parameters:
- '!hudi-flink-datasource/hudi-flink1.15.x'
- '!hudi-flink-datasource/hudi-flink1.16.x'
- '!hudi-flink-datasource/hudi-flink1.17.x'
- '!hudi-flink-datasource/hudi-flink1.18.x'

variables:
BUILD_PROFILES: '-Dscala-2.12 -Dspark3.2 -Dflink1.17'
BUILD_PROFILES: '-Dscala-2.12 -Dspark3.2 -Dflink1.18'
PLUGIN_OPTS: '-Dcheckstyle.skip=true -Drat.skip=true -Djacoco.skip=true -ntp -B -V -Pwarn-log -Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.shade=warn -Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.dependency=warn'
MVN_OPTS_INSTALL: '-Phudi-platform-service -DskipTests $(BUILD_PROFILES) $(PLUGIN_OPTS) -Dmaven.wagon.httpconnectionManager.ttlSeconds=25 -Dmaven.wagon.http.retryHandler.count=5'
MVN_OPTS_TEST: '-fae -Pwarn-log $(BUILD_PROFILES) $(PLUGIN_OPTS)'
Expand Down
1 change: 1 addition & 0 deletions hudi-flink-datasource/hudi-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>${flink.connector.kafka.artifactId}</artifactId>
<version>${flink.connector.kafka.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table.catalog;

import org.apache.hudi.adapter.HiveCatalogConstants.AlterHiveDatabaseOp;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.fs.FSUtils;
Expand Down Expand Up @@ -47,9 +48,6 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase;
import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner;
import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
Expand Down Expand Up @@ -107,17 +105,20 @@
import java.util.List;
import java.util.Map;

import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP;
import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME;
import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_TYPE;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
import static org.apache.hudi.adapter.HiveCatalogConstants.ALTER_DATABASE_OP;
import static org.apache.hudi.adapter.HiveCatalogConstants.DATABASE_LOCATION_URI;
import static org.apache.hudi.adapter.HiveCatalogConstants.DATABASE_OWNER_NAME;
import static org.apache.hudi.adapter.HiveCatalogConstants.DATABASE_OWNER_TYPE;
import static org.apache.hudi.adapter.HiveCatalogConstants.ROLE_OWNER;
import static org.apache.hudi.adapter.HiveCatalogConstants.USER_OWNER;
import static org.apache.hudi.configuration.FlinkOptions.PATH;
import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT;
import static org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME;
import static org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;

/**
* A catalog implementation for Hoodie based on MetaStore.
Expand Down Expand Up @@ -219,7 +220,7 @@ public CatalogDatabase getDatabase(String databaseName)

Map<String, String> properties = new HashMap<>(hiveDatabase.getParameters());

properties.put(SqlCreateHiveDatabase.DATABASE_LOCATION_URI, hiveDatabase.getLocationUri());
properties.put(DATABASE_LOCATION_URI, hiveDatabase.getLocationUri());

return new CatalogDatabaseImpl(properties, hiveDatabase.getDescription());
}
Expand Down Expand Up @@ -248,7 +249,7 @@ public void createDatabase(

Map<String, String> properties = database.getProperties();

String dbLocationUri = properties.remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI);
String dbLocationUri = properties.remove(DATABASE_LOCATION_URI);
if (dbLocationUri == null && this.catalogPath != null) {
// infer default location uri
dbLocationUri = new Path(this.catalogPath, databaseName).toString();
Expand Down Expand Up @@ -318,11 +319,10 @@ private static Database alterDatabase(Database hiveDB, CatalogDatabase newDataba
String opStr = newParams.remove(ALTER_DATABASE_OP);
if (opStr == null) {
// by default is to alter db properties
opStr = SqlAlterHiveDatabase.AlterHiveDatabaseOp.CHANGE_PROPS.name();
opStr = AlterHiveDatabaseOp.CHANGE_PROPS.name();
}
String newLocation = newParams.remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI);
SqlAlterHiveDatabase.AlterHiveDatabaseOp op =
SqlAlterHiveDatabase.AlterHiveDatabaseOp.valueOf(opStr);
String newLocation = newParams.remove(DATABASE_LOCATION_URI);
AlterHiveDatabaseOp op = AlterHiveDatabaseOp.valueOf(opStr);
switch (op) {
case CHANGE_PROPS:
hiveDB.setParameters(newParams);
Expand All @@ -335,10 +335,10 @@ private static Database alterDatabase(Database hiveDB, CatalogDatabase newDataba
String ownerType = newParams.remove(DATABASE_OWNER_TYPE);
hiveDB.setOwnerName(ownerName);
switch (ownerType) {
case SqlAlterHiveDatabaseOwner.ROLE_OWNER:
case ROLE_OWNER:
hiveDB.setOwnerType(PrincipalType.ROLE);
break;
case SqlAlterHiveDatabaseOwner.USER_OWNER:
case USER_OWNER:
hiveDB.setOwnerType(PrincipalType.USER);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.adapter;

import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase;
import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner;
import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase;

/**
* Constants for Hive Catalog.
*/
public class HiveCatalogConstants {

// -----------------------------------------------------------------------------------
// Constants for ALTER DATABASE
// -----------------------------------------------------------------------------------
public static final String ALTER_DATABASE_OP = SqlAlterHiveDatabase.ALTER_DATABASE_OP;

public static final String DATABASE_LOCATION_URI = SqlCreateHiveDatabase.DATABASE_LOCATION_URI;

public static final String DATABASE_OWNER_NAME = SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME;

public static final String DATABASE_OWNER_TYPE = SqlAlterHiveDatabaseOwner.DATABASE_OWNER_TYPE;

public static final String ROLE_OWNER = SqlAlterHiveDatabaseOwner.ROLE_OWNER;

public static final String USER_OWNER = SqlAlterHiveDatabaseOwner.USER_OWNER;

/** Type of ALTER DATABASE operation. */
public enum AlterHiveDatabaseOp {
CHANGE_PROPS,
CHANGE_LOCATION,
CHANGE_OWNER
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.adapter;

import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase;
import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner;
import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase;

/**
* Constants for Hive Catalog.
*/
public class HiveCatalogConstants {

// -----------------------------------------------------------------------------------
// Constants for ALTER DATABASE
// -----------------------------------------------------------------------------------
public static final String ALTER_DATABASE_OP = SqlAlterHiveDatabase.ALTER_DATABASE_OP;

public static final String DATABASE_LOCATION_URI = SqlCreateHiveDatabase.DATABASE_LOCATION_URI;

public static final String DATABASE_OWNER_NAME = SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME;

public static final String DATABASE_OWNER_TYPE = SqlAlterHiveDatabaseOwner.DATABASE_OWNER_TYPE;

public static final String ROLE_OWNER = SqlAlterHiveDatabaseOwner.ROLE_OWNER;

public static final String USER_OWNER = SqlAlterHiveDatabaseOwner.USER_OWNER;

/** Type of ALTER DATABASE operation. */
public enum AlterHiveDatabaseOp {
CHANGE_PROPS,
CHANGE_LOCATION,
CHANGE_OWNER
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.adapter;

import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase;
import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner;
import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase;

/**
* Constants for Hive Catalog.
*/
public class HiveCatalogConstants {

// -----------------------------------------------------------------------------------
// Constants for ALTER DATABASE
// -----------------------------------------------------------------------------------
public static final String ALTER_DATABASE_OP = SqlAlterHiveDatabase.ALTER_DATABASE_OP;

public static final String DATABASE_LOCATION_URI = SqlCreateHiveDatabase.DATABASE_LOCATION_URI;

public static final String DATABASE_OWNER_NAME = SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME;

public static final String DATABASE_OWNER_TYPE = SqlAlterHiveDatabaseOwner.DATABASE_OWNER_TYPE;

public static final String ROLE_OWNER = SqlAlterHiveDatabaseOwner.ROLE_OWNER;

public static final String USER_OWNER = SqlAlterHiveDatabaseOwner.USER_OWNER;

/** Type of ALTER DATABASE operation. */
public enum AlterHiveDatabaseOp {
CHANGE_PROPS,
CHANGE_LOCATION,
CHANGE_OWNER
}
}

Loading

0 comments on commit edab9a6

Please sign in to comment.