Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… #3194

Merged
merged 6 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion flink-cdc-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ limitations under the License.

<properties>
<commons-cli.version>1.6.0</commons-cli.version>
<snakeyaml.version>2.6</snakeyaml.version>
</properties>

<dependencies>
Expand All @@ -37,7 +38,12 @@ limitations under the License.
<artifactId>flink-cdc-common</artifactId>
<version>${project.version}</version>
</dependency>

<!-- YAML parser utilities -->
<dependency>
<groupId>org.snakeyaml</groupId>
<artifactId>snakeyaml-engine</artifactId>
<version>${snakeyaml.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-composer</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private static Configuration getGlobalConfig(CommandLine commandLine) throws Exc
if (globalConfig != null) {
Path globalConfigPath = Paths.get(globalConfig);
LOG.info("Using global config in command line: {}", globalConfigPath);
return ConfigurationUtils.loadMapFormattedConfig(globalConfigPath);
return ConfigurationUtils.loadConfigFile(globalConfigPath);
}

// Fallback to Flink CDC home
Expand All @@ -178,7 +178,7 @@ private static Configuration getGlobalConfig(CommandLine commandLine) throws Exc
Path globalConfigPath =
Paths.get(flinkCdcHome).resolve("conf").resolve("flink-cdc.yaml");
LOG.info("Using global config in FLINK_CDC_HOME: {}", globalConfigPath);
return ConfigurationUtils.loadMapFormattedConfig(globalConfigPath);
return ConfigurationUtils.loadConfigFile(globalConfigPath);
}

// Fallback to empty configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,41 @@

import org.apache.flink.cdc.common.configuration.Configuration;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;

import java.io.FileNotFoundException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** Utilities for handling {@link Configuration}. */
public class ConfigurationUtils {
public static Configuration loadMapFormattedConfig(Path configPath) throws Exception {
if (!Files.exists(configPath)) {
throw new FileNotFoundException(
String.format("Cannot find configuration file at \"%s\"", configPath));
}
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
try {
Map<String, String> configMap =
mapper.readValue(
configPath.toFile(), new TypeReference<Map<String, String>>() {});
return Configuration.fromMap(configMap);
} catch (Exception e) {
throw new IllegalStateException(
String.format(
"Failed to load config file \"%s\" to key-value pairs", configPath),
e);
}

private static final String KEY_SEPARATOR = ".";

public static Configuration loadConfigFile(Path configPath) throws Exception {
Map<String, Object> configMap = YamlParserUtils.loadYamlFile(configPath.toFile());
return Configuration.fromMap(flattenConfigMap(configMap, ""));
}

@SuppressWarnings("unchecked")
private static Map<String, String> flattenConfigMap(
Map<String, Object> config, String keyPrefix) {
final Map<String, String> flattenedMap = new HashMap<>();

config.forEach(
(key, value) -> {
String flattenedKey = keyPrefix + key;
if (value instanceof Map) {
Map<String, Object> e = (Map<String, Object>) value;
flattenedMap.putAll(flattenConfigMap(e, flattenedKey + KEY_SEPARATOR));
} else {
if (value instanceof List) {
flattenedMap.put(flattenedKey, YamlParserUtils.toYAMLString(value));
} else {
flattenedMap.put(flattenedKey, value.toString());
}
}
});

return flattenedMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,33 @@
import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.nio.file.Path;
import java.util.List;

/** Utilities for handling Flink configuration and environment. */
public class FlinkEnvironmentUtils {

private static final Logger LOG = LoggerFactory.getLogger(FlinkEnvironmentUtils.class);
private static final String FLINK_CONF_DIR = "conf";
private static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
private static final String LEGACY_FLINK_CONF_FILENAME = "flink-conf.yaml";
private static final String FLINK_CONF_FILENAME = "config.yaml";

public static Configuration loadFlinkConfiguration(Path flinkHome) throws Exception {
Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME);
return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);
try {
return ConfigurationUtils.loadConfigFile(flinkConfPath);
} catch (FileNotFoundException e) {
LOG.warn(
"Failed to load the configuration file from {}. Trying to use legacy YAML parser to load flink configuration file from {}.",
FLINK_CONF_FILENAME,
LEGACY_FLINK_CONF_FILENAME);
return ConfigurationUtils.loadConfigFile(
flinkHome.resolve(FLINK_CONF_DIR).resolve(LEGACY_FLINK_CONF_FILENAME));
}
}

public static FlinkPipelineComposer createComposer(
Expand Down
Loading
Loading