diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index f28b2fed2..482963f5e 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -97,6 +97,9 @@ public class SnowflakeSinkConnectorConfig { // JDBC trace Info (environment variable) public static final String SNOWFLAKE_JDBC_TRACE = "JDBC_TRACE"; + // JDBC properties map + public static final String SNOWFLAKE_JDBC_MAP = "snowflake.jdbc.map"; + // Snowflake Metadata Flags private static final String SNOWFLAKE_METADATA_FLAGS = "Snowflake Metadata Flags"; public static final String SNOWFLAKE_METADATA_CREATETIME = "snowflake.metadata.createtime"; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/InternalUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/InternalUtils.java index ec07c8d34..0fb12d535 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/InternalUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/InternalUtils.java @@ -306,6 +306,17 @@ protected static Properties generateProxyParametersIfRequired(Map conf) { + String jdbcConfigMapInput = conf.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_JDBC_MAP); + if (jdbcConfigMapInput == null) { + return new Properties(); + } + Map jdbcMap = Utils.parseCommaSeparatedKeyValuePairs(jdbcConfigMapInput); + Properties properties = new Properties(); + properties.putAll(jdbcMap); + return properties; + } + /** * convert ingest status to ingested file status * diff --git a/src/main/java/com/snowflake/kafka/connector/internal/JdbcProperties.java b/src/main/java/com/snowflake/kafka/connector/internal/JdbcProperties.java new file mode 100644 index 000000000..38318033a --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/JdbcProperties.java @@ -0,0 +1,86 @@ +package com.snowflake.kafka.connector.internal; + +import java.util.Properties; + +/** Wrapper class for all snowflake jdbc properties */ +public class JdbcProperties { + + /** All jdbc properties including proxyProperties */ + private final Properties properties; + /** Proxy related properties */ + private final Properties proxyProperties; + + private JdbcProperties(Properties combinedProperties, Properties proxyProperties) { + this.properties = combinedProperties; + this.proxyProperties = proxyProperties; + } + + public Properties getProperties() { + return properties; + } + + public String getProperty(String key) { + return properties.getProperty(key); + } + + public Object get(String key) { + return properties.get(key); + } + + public Properties getProxyProperties() { + return proxyProperties; + } + + /** + * Combine all jdbc related properties. Throws error if jdbcPropertiesMap overrides any property + * defined in connectionProperties or proxyProperties. + * + * @param connectionProperties snowflake.database.name, snowflake.schema,name, + * snowflake.private.key etc. + * @param proxyProperties jvm.proxy.xxx + * @param jdbcPropertiesMap snowflake.jdbc.map + */ + static JdbcProperties create( + Properties connectionProperties, Properties proxyProperties, Properties jdbcPropertiesMap) { + InternalUtils.assertNotEmpty("connectionProperties", connectionProperties); + proxyProperties = setEmptyIfNull(proxyProperties); + jdbcPropertiesMap = setEmptyIfNull(jdbcPropertiesMap); + + Properties proxyAndConnection = mergeProperties(connectionProperties, proxyProperties); + detectOverrides(proxyAndConnection, jdbcPropertiesMap); + + Properties combinedProperties = mergeProperties(proxyAndConnection, jdbcPropertiesMap); + + return new JdbcProperties(combinedProperties, proxyProperties); + } + + /** Test method */ + static JdbcProperties create(Properties connectionProperties) { + return create(connectionProperties, new Properties(), new Properties()); + } + + private static void detectOverrides(Properties proxyAndConnection, Properties jdbcPropertiesMap) { + jdbcPropertiesMap.forEach( + (k, v) -> { + if (proxyAndConnection.containsKey(k)) { + throw SnowflakeErrors.ERROR_0031.getException("Duplicated property: " + k); + } + }); + } + + private static Properties mergeProperties( + Properties connectionProperties, Properties proxyProperties) { + Properties mergedProperties = new Properties(); + mergedProperties.putAll(connectionProperties); + mergedProperties.putAll(proxyProperties); + return mergedProperties; + } + + /** Parsing methods does not return null. However, It's better to be perfectly sure. */ + private static Properties setEmptyIfNull(Properties properties) { + if (properties != null) { + return properties; + } + return new Properties(); + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceFactory.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceFactory.java index 52a531701..c7bc5bb51 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceFactory.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceFactory.java @@ -15,8 +15,8 @@ public static SnowflakeConnectionServiceBuilder builder() { } public static class SnowflakeConnectionServiceBuilder { - private Properties prop; - private Properties proxyProperties; + + private JdbcProperties jdbcProperties; private SnowflakeURL url; private String connectorName; private String taskID = "-1"; @@ -30,15 +30,15 @@ public static class SnowflakeConnectionServiceBuilder { private IngestionMethodConfig ingestionMethodConfig; @VisibleForTesting - public SnowflakeConnectionServiceBuilder setProperties(Properties prop) { - this.prop = prop; + public SnowflakeConnectionServiceBuilder setProperties(Properties connectionProperties) { + this.jdbcProperties = JdbcProperties.create(connectionProperties); this.ingestionMethodConfig = IngestionMethodConfig.SNOWPIPE; return this; } // For testing only public Properties getProperties() { - return this.prop; + return this.jdbcProperties.getProperties(); } public SnowflakeConnectionServiceBuilder setURL(SnowflakeURL url) { @@ -63,24 +63,24 @@ public SnowflakeConnectionServiceBuilder setProperties(Map conf) this.url = new SnowflakeURL(conf.get(Utils.SF_URL)); this.kafkaProvider = SnowflakeSinkConnectorConfig.KafkaProvider.of(conf.get(PROVIDER_CONFIG)).name(); - // TODO: Ideally only one property is required, but because we dont pass it around in JDBC and - // snowpipe SDK, - // it is better if we have two properties decoupled - // Right now, proxy parameters are picked from jvm system properties, in future they need to - // be decoupled - this.proxyProperties = InternalUtils.generateProxyParametersIfRequired(conf); this.connectorName = conf.get(Utils.NAME); this.ingestionMethodConfig = IngestionMethodConfig.determineIngestionMethod(conf); - this.prop = InternalUtils.createProperties(conf, this.url, ingestionMethodConfig); + + Properties proxyProperties = InternalUtils.generateProxyParametersIfRequired(conf); + Properties connectionProperties = + InternalUtils.createProperties(conf, this.url, ingestionMethodConfig); + Properties jdbcPropertiesMap = InternalUtils.parseJdbcPropertiesMap(conf); + this.jdbcProperties = + JdbcProperties.create(connectionProperties, proxyProperties, jdbcPropertiesMap); return this; } public SnowflakeConnectionService build() { - InternalUtils.assertNotEmpty("properties", prop); + InternalUtils.assertNotEmpty("jdbcProperties", jdbcProperties); InternalUtils.assertNotEmpty("url", url); InternalUtils.assertNotEmpty("connectorName", connectorName); return new SnowflakeConnectionServiceV1( - prop, url, connectorName, taskID, proxyProperties, kafkaProvider, ingestionMethodConfig); + jdbcProperties, url, connectorName, taskID, kafkaProvider, ingestionMethodConfig); } } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index 6bedd93e0..d2d9e1ef2 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -39,10 +39,8 @@ public class SnowflakeConnectionServiceV1 implements SnowflakeConnectionService private final SnowflakeTelemetryService telemetry; private final String connectorName; private final String taskID; - private final Properties prop; + private final JdbcProperties jdbcProperties; - // Placeholder for all proxy related properties set in the connector configuration - private final Properties proxyProperties; private final SnowflakeURL url; private final SnowflakeInternalStage internalStage; @@ -60,30 +58,27 @@ public class SnowflakeConnectionServiceV1 implements SnowflakeConnectionService private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); SnowflakeConnectionServiceV1( - Properties prop, + JdbcProperties jdbcProperties, SnowflakeURL url, String connectorName, String taskID, - Properties proxyProperties, String kafkaProvider, IngestionMethodConfig ingestionMethodConfig) { + this.jdbcProperties = jdbcProperties; this.connectorName = connectorName; this.taskID = taskID; this.url = url; - this.prop = prop; this.stageType = null; - this.proxyProperties = proxyProperties; this.kafkaProvider = kafkaProvider; + Properties proxyProperties = jdbcProperties.getProxyProperties(); + Properties combinedProperties = jdbcProperties.getProperties(); try { - if (proxyProperties != null && !proxyProperties.isEmpty()) { - Properties combinedProperties = - mergeProxyAndConnectionProperties(this.prop, this.proxyProperties); + if (!proxyProperties.isEmpty()) { LOGGER.debug("Proxy properties are set, passing in JDBC while creating the connection"); - this.conn = new SnowflakeDriver().connect(url.getJdbcUrl(), combinedProperties); } else { LOGGER.info("Establishing a JDBC connection with url:{}", url.getJdbcUrl()); - this.conn = new SnowflakeDriver().connect(url.getJdbcUrl(), prop); } + this.conn = new SnowflakeDriver().connect(url.getJdbcUrl(), combinedProperties); } catch (SQLException e) { throw SnowflakeErrors.ERROR_1001.getException(e); } @@ -99,17 +94,6 @@ public class SnowflakeConnectionServiceV1 implements SnowflakeConnectionService LOGGER.info("initialized the snowflake connection"); } - /* Merges the two properties. */ - private static Properties mergeProxyAndConnectionProperties( - Properties connectionProperties, Properties proxyProperties) { - assert connectionProperties != null; - assert proxyProperties != null; - Properties mergedProperties = new Properties(); - mergedProperties.putAll(connectionProperties); - mergedProperties.putAll(proxyProperties); - return mergedProperties; - } - @Override public void createTable(final String tableName, final boolean overwrite) { checkConnection(); @@ -931,19 +915,19 @@ public String getConnectorName() { public SnowflakeIngestionService buildIngestService( final String stageName, final String pipeName) { String account = url.getAccount(); - String user = prop.getProperty(InternalUtils.JDBC_USER); + String user = jdbcProperties.getProperty(InternalUtils.JDBC_USER); String userAgentSuffixInHttpRequest = String.format(USER_AGENT_SUFFIX_FORMAT, Utils.VERSION, kafkaProvider); String host = url.getUrlWithoutPort(); int port = url.getPort(); String connectionScheme = url.getScheme(); String fullPipeName = - prop.getProperty(InternalUtils.JDBC_DATABASE) + jdbcProperties.getProperty(InternalUtils.JDBC_DATABASE) + "." - + prop.getProperty(InternalUtils.JDBC_SCHEMA) + + jdbcProperties.getProperty(InternalUtils.JDBC_SCHEMA) + "." + pipeName; - PrivateKey privateKey = (PrivateKey) prop.get(InternalUtils.JDBC_PRIVATE_KEY); + PrivateKey privateKey = (PrivateKey) jdbcProperties.get(InternalUtils.JDBC_PRIVATE_KEY); return SnowflakeIngestionServiceFactory.builder( account, user, @@ -1027,9 +1011,9 @@ public ChannelMigrateOffsetTokenResponseDTO migrateStreamingChannelOffsetToken( InternalUtils.assertNotEmpty("sourceChannelName", sourceChannelName); InternalUtils.assertNotEmpty("destinationChannelName", destinationChannelName); String fullyQualifiedTableName = - prop.getProperty(InternalUtils.JDBC_DATABASE) + jdbcProperties.getProperty(InternalUtils.JDBC_DATABASE) + "." - + prop.getProperty(InternalUtils.JDBC_SCHEMA) + + jdbcProperties.getProperty(InternalUtils.JDBC_SCHEMA) + "." + tableName; String query = "select SYSTEM$SNOWPIPE_STREAMING_MIGRATE_CHANNEL_OFFSET_TOKEN((?), (?), (?));"; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java index 14378a978..14cbe2d51 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java @@ -136,7 +136,10 @@ public enum SnowflakeErrors { String.format( "Failed to parse %s map", SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP)), - + ERROR_0031( + "0031", + "Failed to combine JDBC properties", + "One of snowflake.jdbc.map property overrides other jdbc property"), // Snowflake connection issues 1--- ERROR_1001( "1001", diff --git a/src/test/java/com/snowflake/kafka/connector/internal/InternalUtilsTest.java b/src/test/java/com/snowflake/kafka/connector/internal/InternalUtilsTest.java index dc6980c00..7d4fc54c1 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/InternalUtilsTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/InternalUtilsTest.java @@ -1,5 +1,7 @@ package com.snowflake.kafka.connector.internal; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.mock.MockResultSetForSizeTest; import java.sql.ResultSet; @@ -8,7 +10,7 @@ import java.util.Map; import java.util.Properties; import net.snowflake.ingest.connection.IngestStatus; -import org.junit.Test; +import org.junit.jupiter.api.Test; public class InternalUtilsTest { @Test @@ -134,4 +136,18 @@ public void testResultSize() throws SQLException { resultSet = new MockResultSetForSizeTest(100); assert InternalUtils.resultSize(resultSet) == 100; } + + @Test + public void parseJdbcPropertiesMapTest() { + String key = "snowflake.jdbc.map"; + String input = + "isInsecureMode:true, disableSamlURLCheck:false, passcodeInPassword:on, foo:bar," + + " networkTimeout:100"; + Map config = new HashMap<>(); + config.put(key, input); + // when + Properties jdbcPropertiesMap = InternalUtils.parseJdbcPropertiesMap(config); + // then + assertEquals(jdbcPropertiesMap.size(), 5); + } } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/JdbcPropertiesTest.java b/src/test/java/com/snowflake/kafka/connector/internal/JdbcPropertiesTest.java new file mode 100644 index 000000000..785d41934 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/internal/JdbcPropertiesTest.java @@ -0,0 +1,74 @@ +package com.snowflake.kafka.connector.internal; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Properties; +import org.junit.jupiter.api.Test; + +public class JdbcPropertiesTest { + + @Test + public void shouldCombineProperties() { + // given + SnowflakeURL url = TestUtils.getUrl(); + Properties connection = InternalUtils.createProperties(TestUtils.getConf(), url); + + Properties proxy = new Properties(); + proxy.put("useProxy", "true"); + + Properties jdbcMap = new Properties(); + jdbcMap.put("insecureMode", "true"); + // when + JdbcProperties jdbcProperties = JdbcProperties.create(connection, proxy, jdbcMap); + // then + int givenPropertiesSize = connection.size() + proxy.size() + jdbcMap.size(); + int mergedPropertiesSize = jdbcProperties.getProperties().size(); + + assertEquals(givenPropertiesSize, mergedPropertiesSize); + } + + @Test + public void shouldThrowWhen_jdbcMap_overridesConnection() { + Properties connection = new Properties(); + connection.put("user", "test_user1"); + + Properties proxy = new Properties(); + + Properties jdbcMap = new Properties(); + jdbcMap.put("user", "test_user2"); + jdbcMap.put("insecureMode", "true"); + // expect + assertThatThrownBy(() -> JdbcProperties.create(connection, proxy, jdbcMap)) + .isInstanceOfSatisfying( + SnowflakeKafkaConnectorException.class, + ex -> { + // property key is printed not value + assertTrue(ex.getMessage().contains("user")); + assertEquals("0031", ex.getCode()); + }); + } + + @Test + public void shouldThrowWhen_jdbcMap_overridesProxy() { + Properties connection = new Properties(); + connection.put("user", "test_user1"); + + Properties proxy = new Properties(); + proxy.put("useProxy", "true"); + + Properties jdbcMap = new Properties(); + jdbcMap.put("useProxy", "true"); + jdbcMap.put("insecureMode", "false"); + // expect + assertThatThrownBy(() -> JdbcProperties.create(connection, proxy, jdbcMap)) + .isInstanceOfSatisfying( + SnowflakeKafkaConnectorException.class, + ex -> { + // property key is printed not value + assertTrue(ex.getMessage().contains("useProxy")); + assertEquals("0031", ex.getCode()); + }); + } +} diff --git a/test/rest_request_template/travis_correct_string_json.json b/test/rest_request_template/travis_correct_string_json.json index 0e935d4d1..a02de9889 100644 --- a/test/rest_request_template/travis_correct_string_json.json +++ b/test/rest_request_template/travis_correct_string_json.json @@ -10,6 +10,7 @@ "snowflake.database.name":"SNOWFLAKE_DATABASE", "snowflake.schema.name":"SNOWFLAKE_SCHEMA", "key.converter":"org.apache.kafka.connect.storage.StringConverter", - "value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter" + "value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter", + "snowflake.jdbc.map": "isInsecureMode : true, notYetExistingProp : true" } } diff --git a/test/rest_request_template/travis_correct_string_proxy.json b/test/rest_request_template/travis_correct_string_proxy.json index 9852f624f..5c5b62817 100644 --- a/test/rest_request_template/travis_correct_string_proxy.json +++ b/test/rest_request_template/travis_correct_string_proxy.json @@ -14,6 +14,7 @@ "jvm.proxy.host": "localhost", "jvm.proxy.port": "3128", "jvm.proxy.username": "admin", - "jvm.proxy.password": "test" + "jvm.proxy.password": "test", + "snowflake.jdbc.map": "isInsecureMode : true, notYetExistingProp : true" } }