Skip to content

Commit

Permalink
NO-SNOW Revert one client change, JDBC upgrade and config for arrow a…
Browse files Browse the repository at this point in the history
…nd parquet (#555)

* Revert "no-snow Add connector name in KC client name for better debugging the client … (#547)"

This reverts commit f7df64d.

* Revert "Refactor sink task integration tests to be more readable (#543)"

This reverts commit 1b24187.

* Ignore flaky test

* Revert "Fix client off by one issue (#540)"

This reverts commit a5e9b45.

* Revert "[SNOW-692657] Create one client per connector instead of one client per task (#528)"

This reverts commit 0b4b1cb.

* Revert "SNOW-740327 Create config for using Arrow BDEC file format + Bump Client SDK 1.0.3-beta (#541)"

This reverts commit c5f8aed.

* Revert "[SNOW-726924] Add New jvm.nonProxy.hosts Parameter and update JDBC to 3.13.23 (#533)"

This reverts commit d2c3561.

* Revert "Modify pom.xml to use jdbc defined in connector and not from ingest sdk (#546)"

This reverts commit 79051a1.
  • Loading branch information
sfc-gh-japatel authored Mar 1, 2023
1 parent 2786090 commit d9b0beb
Show file tree
Hide file tree
Showing 24 changed files with 837 additions and 1,994 deletions.
20 changes: 7 additions & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -313,24 +313,11 @@
</exclusions>
</dependency>

<!--JDBC driver for building connection with Snowflake-->
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>3.13.23</version>
</dependency>

<!-- Ingest SDK for copy staged file into snowflake table -->
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>1.1.0</version>
<exclusions>
<exclusion>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down Expand Up @@ -408,6 +395,13 @@
<version>7.2.1</version>
</dependency>

<!--JDBC driver for building connection with Snowflake-->
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>3.13.14</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
Expand Down
20 changes: 7 additions & 13 deletions pom_confluent.xml
Original file line number Diff line number Diff line change
Expand Up @@ -364,24 +364,11 @@
</exclusions>
</dependency>

<!--JDBC driver for building connection with Snowflake-->
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>3.13.23</version>
</dependency>

<!-- Ingest SDK for copy staged file into snowflake table -->
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>1.1.0</version>
<exclusions>
<exclusion>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down Expand Up @@ -459,6 +446,13 @@
<version>7.2.1</version>
</dependency>

<!--JDBC driver for building connection with Snowflake-->
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>3.13.14</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory;
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException;
import com.snowflake.kafka.connector.internal.ingestsdk.IngestSdkProvider;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -44,15 +42,12 @@
* running on Kafka Connect Workers.
*/
public class SnowflakeSinkConnector extends SinkConnector {
// TEMPORARY config of num tasks assigned per client, round up if number is not divisible
// currently set to 1 for a 1:1 task to client ratio, so we can maintain the current functionality
private static final int NUM_TASK_TO_CLIENT = 1;

// create logger without correlationId for now
private static LoggerHandler LOGGER = new LoggerHandler(SnowflakeSinkConnector.class.getName());

private Map<String, String> config; // connector configuration, provided by
// user through kafka connect framework
private String connectorName; // unique name of this connector instance

// SnowflakeJDBCWrapper provides methods to interact with user's snowflake
// account and executes queries
Expand All @@ -69,12 +64,6 @@ public class SnowflakeSinkConnector extends SinkConnector {
// Using setupComplete to synchronize
private boolean setupComplete;

// The id of this connector instance. Should only be reset on start
private String kcInstanceId;

// If this connector is configured to use streaming snowpipe ingestion
private boolean usesStreamingIngestion;

/** No-Arg constructor. Required by Kafka Connect framework */
public SnowflakeSinkConnector() {
setupComplete = false;
Expand All @@ -100,8 +89,7 @@ public void start(final Map<String, String> parsedConfig) {
connectorStartTime = System.currentTimeMillis();

// initialize logging with global instance Id
this.kcInstanceId = this.getKcInstanceId(this.connectorStartTime);
LoggerHandler.setConnectGlobalInstanceId(kcInstanceId);
LoggerHandler.setConnectGlobalInstanceId(this.getKcInstanceId(this.connectorStartTime));

config = new HashMap<>(parsedConfig);

Expand All @@ -119,14 +107,6 @@ public void start(final Map<String, String> parsedConfig) {
// config as a side effect
conn = SnowflakeConnectionServiceFactory.builder().setProperties(config).build();

// check if we are using snowpipe streaming ingestion
this.usesStreamingIngestion =
config != null
&& config.get(SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT) != null
&& config
.get(SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT)
.equalsIgnoreCase(IngestionMethodConfig.SNOWPIPE_STREAMING.toString());

telemetryClient = conn.getTelemetryClient();

telemetryClient.reportKafkaConnectStart(connectorStartTime, this.config);
Expand All @@ -147,9 +127,6 @@ public void stop() {
// set task logging to default
SnowflakeSinkTask.setTotalTaskCreationCount(-1);
setupComplete = false;

IngestSdkProvider.getStreamingClientManager().closeAllStreamingClients();

LOGGER.info("SnowflakeSinkConnector:stop");
telemetryClient.reportKafkaConnectStop(connectorStartTime);
}
Expand Down Expand Up @@ -179,13 +156,6 @@ public Class<? extends Task> taskClass() {
*/
@Override
public List<Map<String, String>> taskConfigs(final int maxTasks) {
// create all necessary clients, evenly mapping tasks to clients
// must be done here instead of start() because we need the maxTasks value
if (this.usesStreamingIngestion) {
IngestSdkProvider.getStreamingClientManager()
.createAllStreamingClients(config, kcInstanceId, maxTasks, NUM_TASK_TO_CLIENT);
}

// wait for setup to complete
int counter = 0;
while (counter < 120) // poll for 120*5 seconds (10 mins) maximum
Expand All @@ -206,7 +176,6 @@ public List<Map<String, String>> taskConfigs(final int maxTasks) {
throw SnowflakeErrors.ERROR_5007.getException(telemetryClient);
}

// taskIds must be consecutive, the StreamingClientManager relies on this
List<Map<String, String>> taskConfigs = new ArrayList<>(maxTasks);
for (int i = 0; i < maxTasks; i++) {
Map<String, String> conf = new HashMap<>(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public class SnowflakeSinkConnectorConfig {
private static final String PROXY_INFO = "Proxy Info";
public static final String JVM_PROXY_HOST = "jvm.proxy.host";
public static final String JVM_PROXY_PORT = "jvm.proxy.port";
public static final String JVM_NON_PROXY_HOSTS = "jvm.nonProxy.hosts";
public static final String JVM_PROXY_USERNAME = "jvm.proxy.username";
public static final String JVM_PROXY_PASSWORD = "jvm.proxy.password";

Expand Down Expand Up @@ -112,10 +111,6 @@ public class SnowflakeSinkConnectorConfig {
public static final String INGESTION_METHOD_DEFAULT_SNOWPIPE =
IngestionMethodConfig.SNOWPIPE.toString();

// This is the streaming bdec file version which can be defined in config
// NOTE: Please do not override this value unless recommended from snowflake
public static final String SNOWPIPE_STREAMING_FILE_VERSION = "snowflake.streaming.file.version";

// TESTING
public static final String REBALANCING = "snowflake.test.rebalancing";
public static final boolean REBALANCING_DEFAULT = false;
Expand Down Expand Up @@ -307,24 +302,14 @@ static ConfigDef newConfigDef() {
1,
ConfigDef.Width.NONE,
JVM_PROXY_PORT)
.define(
JVM_NON_PROXY_HOSTS,
Type.STRING,
"",
Importance.LOW,
"JVM option: http.nonProxyHosts",
PROXY_INFO,
2,
ConfigDef.Width.NONE,
JVM_NON_PROXY_HOSTS)
.define(
JVM_PROXY_USERNAME,
Type.STRING,
"",
Importance.LOW,
"JVM proxy username",
PROXY_INFO,
3,
2,
ConfigDef.Width.NONE,
JVM_PROXY_USERNAME)
.define(
Expand All @@ -334,7 +319,7 @@ static ConfigDef newConfigDef() {
Importance.LOW,
"JVM proxy password",
PROXY_INFO,
4,
3,
ConfigDef.Width.NONE,
JVM_PROXY_PASSWORD)
// Connector Config
Expand Down Expand Up @@ -478,18 +463,6 @@ static ConfigDef newConfigDef() {
5,
ConfigDef.Width.NONE,
INGESTION_METHOD_OPT)
.define(
SNOWPIPE_STREAMING_FILE_VERSION,
Type.STRING,
"", // default is handled in Ingest SDK
null, // no validator
Importance.LOW,
"Acceptable values for Snowpipe Streaming BDEC Versions: 1 and 3. Check Ingest"
+ " SDK for default behavior. Please do not set this unless Absolutely needed. ",
CONNECTOR_CONFIG,
6,
ConfigDef.Width.NONE,
SNOWPIPE_STREAMING_FILE_VERSION)
.define(
ERRORS_TOLERANCE_CONFIG,
Type.STRING,
Expand Down
26 changes: 1 addition & 25 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ public class Utils {
public static final String HTTPS_PROXY_PORT = "https.proxyPort";
public static final String HTTP_PROXY_HOST = "http.proxyHost";
public static final String HTTP_PROXY_PORT = "http.proxyPort";
public static final String HTTP_NON_PROXY_HOSTS = "http.nonProxyHosts";

public static final String JDK_HTTP_AUTH_TUNNELING = "jdk.http.auth.tunneling.disabledSchemes";
public static final String HTTPS_PROXY_USER = "https.proxyUser";
Expand Down Expand Up @@ -225,7 +224,6 @@ static void validateProxySetting(Map<String, String> config) {
String port =
SnowflakeSinkConnectorConfig.getProperty(
config, SnowflakeSinkConnectorConfig.JVM_PROXY_PORT);

// either both host and port are provided or none of them are provided
if (host != null ^ port != null) {
throw SnowflakeErrors.ERROR_0022.getException(
Expand Down Expand Up @@ -264,12 +262,8 @@ static boolean enableJVMProxy(Map<String, String> config) {
String port =
SnowflakeSinkConnectorConfig.getProperty(
config, SnowflakeSinkConnectorConfig.JVM_PROXY_PORT);
String nonProxyHosts =
SnowflakeSinkConnectorConfig.getProperty(
config, SnowflakeSinkConnectorConfig.JVM_NON_PROXY_HOSTS);
if (host != null && port != null) {
LOGGER.info(
"enable jvm proxy: {}:{} and bypass proxy for hosts: {}", host, port, nonProxyHosts);
LOGGER.info("enable jvm proxy: {}:{}", host, port);

// enable https proxy
System.setProperty(HTTP_USE_PROXY, "true");
Expand All @@ -278,17 +272,6 @@ static boolean enableJVMProxy(Map<String, String> config) {
System.setProperty(HTTPS_PROXY_HOST, host);
System.setProperty(HTTPS_PROXY_PORT, port);

// If the user provided the jvm.nonProxy.hosts configuration then we
// will append that to the list provided by the JVM argument
// -Dhttp.nonProxyHosts and not override it altogether, if it exists.
if (nonProxyHosts != null) {
nonProxyHosts =
(System.getProperty(HTTP_NON_PROXY_HOSTS) != null)
? System.getProperty(HTTP_NON_PROXY_HOSTS) + "|" + nonProxyHosts
: nonProxyHosts;
System.setProperty(HTTP_NON_PROXY_HOSTS, nonProxyHosts);
}

// set username and password
String username =
SnowflakeSinkConnectorConfig.getProperty(
Expand Down Expand Up @@ -382,13 +365,6 @@ static String validateConfig(Map<String, String> config) {
"Schematization is only available with {}.",
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
}
if (config.containsKey(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION)) {
configIsValid = false;
LOGGER.error(
"{} is only available with ingestion type: {}.",
SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
}
}

if (config.containsKey(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,6 @@ protected static Properties generateProxyParametersIfRequired(Map<String, String
SFSessionProperty.PROXY_PORT.getPropertyKey(),
conf.get(SnowflakeSinkConnectorConfig.JVM_PROXY_PORT));

// nonProxyHosts parameter is not required. Check if it was set or not.
if (conf.get(SnowflakeSinkConnectorConfig.JVM_NON_PROXY_HOSTS) != null) {
proxyProperties.put(
SFSessionProperty.NON_PROXY_HOSTS.getPropertyKey(),
conf.get(SnowflakeSinkConnectorConfig.JVM_NON_PROXY_HOSTS));
}

// For username and password, check if host and port are given.
// If they are given, check if username and password are non null
String username = conf.get(SnowflakeSinkConnectorConfig.JVM_PROXY_USERNAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,4 @@ public interface SnowflakeConnectionService {
* @param tableName table name
*/
void createTableWithOnlyMetadataColumn(String tableName);

/**
* Gets the task id for this snowflake connection
*
* @return The task id for the snowflake connection
*/
int getTaskId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -1001,9 +1001,4 @@ public Connection getConnection() {
public SnowflakeInternalStage getInternalStage() {
return this.internalStage;
}

@Override
public int getTaskId() {
return Integer.parseInt(this.taskID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,6 @@ public enum SnowflakeErrors {
ERROR_3006("3006", "Failed to configure client status", "Exception reported by Ingest SDK"),
ERROR_3007("3007", "Failed to get client status", "Exception reported by Ingest SDK"),
ERROR_3008("3008", "Failed to ingest file with client info", "Exception reported by Ingest SDK"),
ERROR_3009("3009", "Failed to get streaming ingest client", "The client must be initialized"),
ERROR_3010(
"3010",
"Invalid task id(s)",
"Task id(s) was not correctly mapped to a streaming ingest client"),
ERROR_3011(
"3011", "Invalid client task map parameters", "Given task count or ratio should be above 0"),
// Wrong result issues 4---
ERROR_4001("4001", "Unexpected Result", "Get wrong results from Snowflake service"),
// Connector internal errors 5---
Expand Down
Loading

0 comments on commit d9b0beb

Please sign in to comment.