diff --git a/pom.xml b/pom.xml index bae78ccf0..0f18b78c3 100644 --- a/pom.xml +++ b/pom.xml @@ -338,7 +338,7 @@ net.snowflake snowflake-ingest-sdk - 2.3.0 + 3.0.0 net.snowflake diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java index 03f5ce65c..c9c97f994 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java @@ -23,7 +23,6 @@ import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory; import net.snowflake.ingest.utils.SFException; -import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.kafka.connect.errors.ConnectException; /** This class handles all calls to manage the streaming ingestion client */ @@ -49,8 +48,6 @@ public SnowflakeStreamingIngestClient createClient( .setProperties(streamingClientProperties.clientProperties) .setParameterOverrides(streamingClientProperties.parameterOverrides); - setIcebergEnabled(builder, streamingClientProperties.isIcebergEnabled); - SnowflakeStreamingIngestClient createdClient = builder.build(); LOGGER.info( @@ -65,17 +62,6 @@ public SnowflakeStreamingIngestClient createClient( } } - private static void setIcebergEnabled( - SnowflakeStreamingIngestClientFactory.Builder builder, boolean isIcebergEnabled) { - try { - // TODO reflection should be replaced by proper builder.setIceberg(true) call in SNOW-1728002 - FieldUtils.writeField(builder, "isIceberg", isIcebergEnabled, true); - } catch (IllegalAccessException e) { - throw new IllegalStateException( - "Couldn't set iceberg by accessing private field: " + "isIceberg", e); - } - } - /** * Closes the given client. Swallows any exceptions * diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java index acf6bfcce..de8459e96 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java @@ -21,6 +21,7 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES; +import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CLIENT_LAG; import static net.snowflake.ingest.utils.ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES; @@ -90,6 +91,8 @@ public StreamingClientProperties(Map connectorConfig) { // Override only if the streaming client properties are explicitly set in config this.parameterOverrides = new HashMap<>(); + parameterOverrides.put(ENABLE_ICEBERG_STREAMING, isIcebergEnabled); + Optional snowpipeStreamingMaxClientLag = Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_MAX_CLIENT_LAG)); snowpipeStreamingMaxClientLag.ifPresent(