diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java index 03f5ce65c..c3c7239d1 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java @@ -23,7 +23,6 @@ import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory; import net.snowflake.ingest.utils.SFException; -import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.kafka.connect.errors.ConnectException; /** This class handles all calls to manage the streaming ingestion client */ @@ -47,9 +46,8 @@ public SnowflakeStreamingIngestClient createClient( SnowflakeStreamingIngestClientFactory.builder( streamingClientProperties.clientName + "_" + createdClientId.getAndIncrement()) .setProperties(streamingClientProperties.clientProperties) - .setParameterOverrides(streamingClientProperties.parameterOverrides); - - setIcebergEnabled(builder, streamingClientProperties.isIcebergEnabled); + .setParameterOverrides(streamingClientProperties.parameterOverrides) + ; SnowflakeStreamingIngestClient createdClient = builder.build(); @@ -65,17 +63,6 @@ public SnowflakeStreamingIngestClient createClient( } } - private static void setIcebergEnabled( - SnowflakeStreamingIngestClientFactory.Builder builder, boolean isIcebergEnabled) { - try { - // TODO reflection should be replaced by proper builder.setIceberg(true) call in SNOW-1728002 - FieldUtils.writeField(builder, "isIceberg", isIcebergEnabled, true); - } catch (IllegalAccessException e) { - throw new IllegalStateException( - "Couldn't set iceberg by accessing private field: " + "isIceberg", e); - } - } - /** * Closes the given client. Swallows any exceptions * diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java index acf6bfcce..f82b9add4 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java @@ -90,6 +90,10 @@ public StreamingClientProperties(Map connectorConfig) { // Override only if the streaming client properties are explicitly set in config this.parameterOverrides = new HashMap<>(); + if (isIcebergEnabled) { + // todo extract to field + this.parameterOverrides.put("enable_iceberg_streaming", "true"); + } Optional snowpipeStreamingMaxClientLag = Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_MAX_CLIENT_LAG)); snowpipeStreamingMaxClientLag.ifPresent(