Skip to content

Commit

Permalink
[FLINK-34969][cdc][cli] Add support for both new and old Flink config…
Browse files Browse the repository at this point in the history
… files in Flink CDC (#3194)
  • Loading branch information
skymilong authored Apr 23, 2024
1 parent 313726b commit 2103226
Show file tree
Hide file tree
Showing 7 changed files with 478 additions and 28 deletions.
8 changes: 7 additions & 1 deletion flink-cdc-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ limitations under the License.

<properties>
<commons-cli.version>1.6.0</commons-cli.version>
<snakeyaml.version>2.6</snakeyaml.version>
</properties>

<dependencies>
Expand All @@ -37,7 +38,12 @@ limitations under the License.
<artifactId>flink-cdc-common</artifactId>
<version>${project.version}</version>
</dependency>

<!-- YAML parser utilities -->
<dependency>
<groupId>org.snakeyaml</groupId>
<artifactId>snakeyaml-engine</artifactId>
<version>${snakeyaml.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-composer</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private static Configuration getGlobalConfig(CommandLine commandLine) throws Exc
if (globalConfig != null) {
Path globalConfigPath = Paths.get(globalConfig);
LOG.info("Using global config in command line: {}", globalConfigPath);
return ConfigurationUtils.loadMapFormattedConfig(globalConfigPath);
return ConfigurationUtils.loadConfigFile(globalConfigPath);
}

// Fallback to Flink CDC home
Expand All @@ -178,7 +178,7 @@ private static Configuration getGlobalConfig(CommandLine commandLine) throws Exc
Path globalConfigPath =
Paths.get(flinkCdcHome).resolve("conf").resolve("flink-cdc.yaml");
LOG.info("Using global config in FLINK_CDC_HOME: {}", globalConfigPath);
return ConfigurationUtils.loadMapFormattedConfig(globalConfigPath);
return ConfigurationUtils.loadConfigFile(globalConfigPath);
}

// Fallback to empty configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,41 @@

import org.apache.flink.cdc.common.configuration.Configuration;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;

import java.io.FileNotFoundException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** Utilities for handling {@link Configuration}. */
public class ConfigurationUtils {
public static Configuration loadMapFormattedConfig(Path configPath) throws Exception {
if (!Files.exists(configPath)) {
throw new FileNotFoundException(
String.format("Cannot find configuration file at \"%s\"", configPath));
}
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
try {
Map<String, String> configMap =
mapper.readValue(
configPath.toFile(), new TypeReference<Map<String, String>>() {});
return Configuration.fromMap(configMap);
} catch (Exception e) {
throw new IllegalStateException(
String.format(
"Failed to load config file \"%s\" to key-value pairs", configPath),
e);
}

private static final String KEY_SEPARATOR = ".";

public static Configuration loadConfigFile(Path configPath) throws Exception {
Map<String, Object> configMap = YamlParserUtils.loadYamlFile(configPath.toFile());
return Configuration.fromMap(flattenConfigMap(configMap, ""));
}

@SuppressWarnings("unchecked")
private static Map<String, String> flattenConfigMap(
Map<String, Object> config, String keyPrefix) {
final Map<String, String> flattenedMap = new HashMap<>();

config.forEach(
(key, value) -> {
String flattenedKey = keyPrefix + key;
if (value instanceof Map) {
Map<String, Object> e = (Map<String, Object>) value;
flattenedMap.putAll(flattenConfigMap(e, flattenedKey + KEY_SEPARATOR));
} else {
if (value instanceof List) {
flattenedMap.put(flattenedKey, YamlParserUtils.toYAMLString(value));
} else {
flattenedMap.put(flattenedKey, value.toString());
}
}
});

return flattenedMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,33 @@
import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.nio.file.Path;
import java.util.List;

/** Utilities for handling Flink configuration and environment. */
public class FlinkEnvironmentUtils {

private static final Logger LOG = LoggerFactory.getLogger(FlinkEnvironmentUtils.class);
private static final String FLINK_CONF_DIR = "conf";
private static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
private static final String LEGACY_FLINK_CONF_FILENAME = "flink-conf.yaml";
private static final String FLINK_CONF_FILENAME = "config.yaml";

public static Configuration loadFlinkConfiguration(Path flinkHome) throws Exception {
Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME);
return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);
try {
return ConfigurationUtils.loadConfigFile(flinkConfPath);
} catch (FileNotFoundException e) {
LOG.warn(
"Failed to load the configuration file from {}. Trying to use legacy YAML parser to load flink configuration file from {}.",
FLINK_CONF_FILENAME,
LEGACY_FLINK_CONF_FILENAME);
return ConfigurationUtils.loadConfigFile(
flinkHome.resolve(FLINK_CONF_DIR).resolve(LEGACY_FLINK_CONF_FILENAME));
}
}

public static FlinkPipelineComposer createComposer(
Expand Down
Loading

0 comments on commit 2103226

Please sign in to comment.