Skip to content

Commit

Permalink
SNOW-1571459 parse jbdc properties (#909)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bzabek authored Aug 13, 2024
1 parent 05c1148 commit 3dabfc0
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ public class SnowflakeSinkConnectorConfig {
// JDBC trace Info (environment variable)
public static final String SNOWFLAKE_JDBC_TRACE = "JDBC_TRACE";

// JDBC properties map
public static final String SNOWFLAKE_JDBC_MAP = "snowflake.jdbc.map";

// Snowflake Metadata Flags
private static final String SNOWFLAKE_METADATA_FLAGS = "Snowflake Metadata Flags";
public static final String SNOWFLAKE_METADATA_CREATETIME = "snowflake.metadata.createtime";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,17 @@ protected static Properties generateProxyParametersIfRequired(Map<String, String
return proxyProperties;
}

protected static Properties parseJdbcPropertiesMap(Map<String, String> conf) {
String jdbcConfigMapInput = conf.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_JDBC_MAP);
if (jdbcConfigMapInput == null) {
return new Properties();
}
Map<String, String> jdbcMap = Utils.parseCommaSeparatedKeyValuePairs(jdbcConfigMapInput);
Properties properties = new Properties();
properties.putAll(jdbcMap);
return properties;
}

/**
* convert ingest status to ingested file status
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.snowflake.kafka.connector.internal;

import java.util.Properties;

/** Wrapper class for all snowflake jdbc properties */
public class JdbcProperties {

/** All jdbc properties including proxyProperties */
private final Properties properties;
/** Proxy related properties */
private final Properties proxyProperties;

private JdbcProperties(Properties combinedProperties, Properties proxyProperties) {
this.properties = combinedProperties;
this.proxyProperties = proxyProperties;
}

public Properties getProperties() {
return properties;
}

public String getProperty(String key) {
return properties.getProperty(key);
}

public Object get(String key) {
return properties.get(key);
}

public Properties getProxyProperties() {
return proxyProperties;
}

/**
* Combine all jdbc related properties. Throws error if jdbcPropertiesMap overrides any property
* defined in connectionProperties or proxyProperties.
*
* @param connectionProperties snowflake.database.name, snowflake.schema,name,
* snowflake.private.key etc.
* @param proxyProperties jvm.proxy.xxx
* @param jdbcPropertiesMap snowflake.jdbc.map
*/
static JdbcProperties create(
Properties connectionProperties, Properties proxyProperties, Properties jdbcPropertiesMap) {
InternalUtils.assertNotEmpty("connectionProperties", connectionProperties);
proxyProperties = setEmptyIfNull(proxyProperties);
jdbcPropertiesMap = setEmptyIfNull(jdbcPropertiesMap);

Properties proxyAndConnection = mergeProperties(connectionProperties, proxyProperties);
detectOverrides(proxyAndConnection, jdbcPropertiesMap);

Properties combinedProperties = mergeProperties(proxyAndConnection, jdbcPropertiesMap);

return new JdbcProperties(combinedProperties, proxyProperties);
}

/** Test method */
static JdbcProperties create(Properties connectionProperties) {
return create(connectionProperties, new Properties(), new Properties());
}

private static void detectOverrides(Properties proxyAndConnection, Properties jdbcPropertiesMap) {
jdbcPropertiesMap.forEach(
(k, v) -> {
if (proxyAndConnection.containsKey(k)) {
throw SnowflakeErrors.ERROR_0031.getException("Duplicated property: " + k);
}
});
}

private static Properties mergeProperties(
Properties connectionProperties, Properties proxyProperties) {
Properties mergedProperties = new Properties();
mergedProperties.putAll(connectionProperties);
mergedProperties.putAll(proxyProperties);
return mergedProperties;
}

/** Parsing methods does not return null. However, It's better to be perfectly sure. */
private static Properties setEmptyIfNull(Properties properties) {
if (properties != null) {
return properties;
}
return new Properties();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ public static SnowflakeConnectionServiceBuilder builder() {
}

public static class SnowflakeConnectionServiceBuilder {
private Properties prop;
private Properties proxyProperties;

private JdbcProperties jdbcProperties;
private SnowflakeURL url;
private String connectorName;
private String taskID = "-1";
Expand All @@ -30,15 +30,15 @@ public static class SnowflakeConnectionServiceBuilder {
private IngestionMethodConfig ingestionMethodConfig;

@VisibleForTesting
public SnowflakeConnectionServiceBuilder setProperties(Properties prop) {
this.prop = prop;
public SnowflakeConnectionServiceBuilder setProperties(Properties connectionProperties) {
this.jdbcProperties = JdbcProperties.create(connectionProperties);
this.ingestionMethodConfig = IngestionMethodConfig.SNOWPIPE;
return this;
}

// For testing only
public Properties getProperties() {
return this.prop;
return this.jdbcProperties.getProperties();
}

public SnowflakeConnectionServiceBuilder setURL(SnowflakeURL url) {
Expand All @@ -63,24 +63,24 @@ public SnowflakeConnectionServiceBuilder setProperties(Map<String, String> conf)
this.url = new SnowflakeURL(conf.get(Utils.SF_URL));
this.kafkaProvider =
SnowflakeSinkConnectorConfig.KafkaProvider.of(conf.get(PROVIDER_CONFIG)).name();
// TODO: Ideally only one property is required, but because we dont pass it around in JDBC and
// snowpipe SDK,
// it is better if we have two properties decoupled
// Right now, proxy parameters are picked from jvm system properties, in future they need to
// be decoupled
this.proxyProperties = InternalUtils.generateProxyParametersIfRequired(conf);
this.connectorName = conf.get(Utils.NAME);
this.ingestionMethodConfig = IngestionMethodConfig.determineIngestionMethod(conf);
this.prop = InternalUtils.createProperties(conf, this.url, ingestionMethodConfig);

Properties proxyProperties = InternalUtils.generateProxyParametersIfRequired(conf);
Properties connectionProperties =
InternalUtils.createProperties(conf, this.url, ingestionMethodConfig);
Properties jdbcPropertiesMap = InternalUtils.parseJdbcPropertiesMap(conf);
this.jdbcProperties =
JdbcProperties.create(connectionProperties, proxyProperties, jdbcPropertiesMap);
return this;
}

public SnowflakeConnectionService build() {
InternalUtils.assertNotEmpty("properties", prop);
InternalUtils.assertNotEmpty("jdbcProperties", jdbcProperties);
InternalUtils.assertNotEmpty("url", url);
InternalUtils.assertNotEmpty("connectorName", connectorName);
return new SnowflakeConnectionServiceV1(
prop, url, connectorName, taskID, proxyProperties, kafkaProvider, ingestionMethodConfig);
jdbcProperties, url, connectorName, taskID, kafkaProvider, ingestionMethodConfig);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ public class SnowflakeConnectionServiceV1 implements SnowflakeConnectionService
private final SnowflakeTelemetryService telemetry;
private final String connectorName;
private final String taskID;
private final Properties prop;
private final JdbcProperties jdbcProperties;

// Placeholder for all proxy related properties set in the connector configuration
private final Properties proxyProperties;
private final SnowflakeURL url;
private final SnowflakeInternalStage internalStage;

Expand All @@ -60,30 +58,27 @@ public class SnowflakeConnectionServiceV1 implements SnowflakeConnectionService
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

SnowflakeConnectionServiceV1(
Properties prop,
JdbcProperties jdbcProperties,
SnowflakeURL url,
String connectorName,
String taskID,
Properties proxyProperties,
String kafkaProvider,
IngestionMethodConfig ingestionMethodConfig) {
this.jdbcProperties = jdbcProperties;
this.connectorName = connectorName;
this.taskID = taskID;
this.url = url;
this.prop = prop;
this.stageType = null;
this.proxyProperties = proxyProperties;
this.kafkaProvider = kafkaProvider;
Properties proxyProperties = jdbcProperties.getProxyProperties();
Properties combinedProperties = jdbcProperties.getProperties();
try {
if (proxyProperties != null && !proxyProperties.isEmpty()) {
Properties combinedProperties =
mergeProxyAndConnectionProperties(this.prop, this.proxyProperties);
if (!proxyProperties.isEmpty()) {
LOGGER.debug("Proxy properties are set, passing in JDBC while creating the connection");
this.conn = new SnowflakeDriver().connect(url.getJdbcUrl(), combinedProperties);
} else {
LOGGER.info("Establishing a JDBC connection with url:{}", url.getJdbcUrl());
this.conn = new SnowflakeDriver().connect(url.getJdbcUrl(), prop);
}
this.conn = new SnowflakeDriver().connect(url.getJdbcUrl(), combinedProperties);
} catch (SQLException e) {
throw SnowflakeErrors.ERROR_1001.getException(e);
}
Expand All @@ -99,17 +94,6 @@ public class SnowflakeConnectionServiceV1 implements SnowflakeConnectionService
LOGGER.info("initialized the snowflake connection");
}

/* Merges the two properties. */
private static Properties mergeProxyAndConnectionProperties(
Properties connectionProperties, Properties proxyProperties) {
assert connectionProperties != null;
assert proxyProperties != null;
Properties mergedProperties = new Properties();
mergedProperties.putAll(connectionProperties);
mergedProperties.putAll(proxyProperties);
return mergedProperties;
}

@Override
public void createTable(final String tableName, final boolean overwrite) {
checkConnection();
Expand Down Expand Up @@ -931,19 +915,19 @@ public String getConnectorName() {
public SnowflakeIngestionService buildIngestService(
final String stageName, final String pipeName) {
String account = url.getAccount();
String user = prop.getProperty(InternalUtils.JDBC_USER);
String user = jdbcProperties.getProperty(InternalUtils.JDBC_USER);
String userAgentSuffixInHttpRequest =
String.format(USER_AGENT_SUFFIX_FORMAT, Utils.VERSION, kafkaProvider);
String host = url.getUrlWithoutPort();
int port = url.getPort();
String connectionScheme = url.getScheme();
String fullPipeName =
prop.getProperty(InternalUtils.JDBC_DATABASE)
jdbcProperties.getProperty(InternalUtils.JDBC_DATABASE)
+ "."
+ prop.getProperty(InternalUtils.JDBC_SCHEMA)
+ jdbcProperties.getProperty(InternalUtils.JDBC_SCHEMA)
+ "."
+ pipeName;
PrivateKey privateKey = (PrivateKey) prop.get(InternalUtils.JDBC_PRIVATE_KEY);
PrivateKey privateKey = (PrivateKey) jdbcProperties.get(InternalUtils.JDBC_PRIVATE_KEY);
return SnowflakeIngestionServiceFactory.builder(
account,
user,
Expand Down Expand Up @@ -1027,9 +1011,9 @@ public ChannelMigrateOffsetTokenResponseDTO migrateStreamingChannelOffsetToken(
InternalUtils.assertNotEmpty("sourceChannelName", sourceChannelName);
InternalUtils.assertNotEmpty("destinationChannelName", destinationChannelName);
String fullyQualifiedTableName =
prop.getProperty(InternalUtils.JDBC_DATABASE)
jdbcProperties.getProperty(InternalUtils.JDBC_DATABASE)
+ "."
+ prop.getProperty(InternalUtils.JDBC_SCHEMA)
+ jdbcProperties.getProperty(InternalUtils.JDBC_SCHEMA)
+ "."
+ tableName;
String query = "select SYSTEM$SNOWPIPE_STREAMING_MIGRATE_CHANNEL_OFFSET_TOKEN((?), (?), (?));";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ public enum SnowflakeErrors {
String.format(
"Failed to parse %s map",
SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP)),

ERROR_0031(
"0031",
"Failed to combine JDBC properties",
"One of snowflake.jdbc.map property overrides other jdbc property"),
// Snowflake connection issues 1---
ERROR_1001(
"1001",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.snowflake.kafka.connector.internal;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.mock.MockResultSetForSizeTest;
import java.sql.ResultSet;
Expand All @@ -8,7 +10,7 @@
import java.util.Map;
import java.util.Properties;
import net.snowflake.ingest.connection.IngestStatus;
import org.junit.Test;
import org.junit.jupiter.api.Test;

public class InternalUtilsTest {
@Test
Expand Down Expand Up @@ -134,4 +136,18 @@ public void testResultSize() throws SQLException {
resultSet = new MockResultSetForSizeTest(100);
assert InternalUtils.resultSize(resultSet) == 100;
}

@Test
public void parseJdbcPropertiesMapTest() {
String key = "snowflake.jdbc.map";
String input =
"isInsecureMode:true, disableSamlURLCheck:false, passcodeInPassword:on, foo:bar,"
+ " networkTimeout:100";
Map<String, String> config = new HashMap<>();
config.put(key, input);
// when
Properties jdbcPropertiesMap = InternalUtils.parseJdbcPropertiesMap(config);
// then
assertEquals(jdbcPropertiesMap.size(), 5);
}
}
Loading

0 comments on commit 3dabfc0

Please sign in to comment.