From f7652f9b9f12b6f380fa7e1ce6ed3bbf4811e080 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Z=C4=85bek?= Date: Thu, 14 Nov 2024 13:01:40 +0100 Subject: [PATCH] SNOW-1728002 get rid of reflection when enabling iceberg streaming (#995) Co-authored-by: Michal Bobowski --- pom.xml | 2 +- pom_confluent.xml | 2 +- .../streaming/DirectStreamingClientHandler.java | 14 -------------- .../streaming/StreamingClientProperties.java | 4 ++++ .../streaming/iceberg/IcebergIngestionIT.java | 2 ++ 5 files changed, 8 insertions(+), 16 deletions(-) diff --git a/pom.xml b/pom.xml index bae78ccf0..0f18b78c3 100644 --- a/pom.xml +++ b/pom.xml @@ -338,7 +338,7 @@ net.snowflake snowflake-ingest-sdk - 2.3.0 + 3.0.0 net.snowflake diff --git a/pom_confluent.xml b/pom_confluent.xml index 59224c6e1..61e7f3685 100644 --- a/pom_confluent.xml +++ b/pom_confluent.xml @@ -479,7 +479,7 @@ net.snowflake snowflake-ingest-sdk - 2.3.0 + 3.0.0 net.snowflake diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java index 03f5ce65c..c9c97f994 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java @@ -23,7 +23,6 @@ import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory; import net.snowflake.ingest.utils.SFException; -import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.kafka.connect.errors.ConnectException; /** This class handles all calls to manage the streaming ingestion client */ @@ -49,8 +48,6 @@ public SnowflakeStreamingIngestClient createClient( .setProperties(streamingClientProperties.clientProperties) .setParameterOverrides(streamingClientProperties.parameterOverrides); - setIcebergEnabled(builder, streamingClientProperties.isIcebergEnabled); - SnowflakeStreamingIngestClient createdClient = builder.build(); LOGGER.info( @@ -65,17 +62,6 @@ public SnowflakeStreamingIngestClient createClient( } } - private static void setIcebergEnabled( - SnowflakeStreamingIngestClientFactory.Builder builder, boolean isIcebergEnabled) { - try { - // TODO reflection should be replaced by proper builder.setIceberg(true) call in SNOW-1728002 - FieldUtils.writeField(builder, "isIceberg", isIcebergEnabled, true); - } catch (IllegalAccessException e) { - throw new IllegalStateException( - "Couldn't set iceberg by accessing private field: " + "isIceberg", e); - } - } - /** * Closes the given client. Swallows any exceptions * diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java index acf6bfcce..1672f0fc4 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java @@ -21,6 +21,7 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES; +import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CLIENT_LAG; import static net.snowflake.ingest.utils.ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES; @@ -90,6 +91,9 @@ public StreamingClientProperties(Map connectorConfig) { // Override only if the streaming client properties are explicitly set in config this.parameterOverrides = new HashMap<>(); + if (isIcebergEnabled) { + parameterOverrides.put(ENABLE_ICEBERG_STREAMING, "true"); + } Optional snowpipeStreamingMaxClientLag = Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_MAX_CLIENT_LAG)); snowpipeStreamingMaxClientLag.ifPresent( diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java index 11dd31868..2bd6f666c 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java @@ -2,6 +2,7 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ICEBERG_ENABLED; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER; import static com.snowflake.kafka.connector.internal.TestUtils.getConfForStreaming; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; @@ -49,6 +50,7 @@ public void setUp() { SnowflakeSinkConnectorConfig.setDefaultValues(config); config.put(ICEBERG_ENABLED, "TRUE"); config.put(ENABLE_SCHEMATIZATION_CONFIG, isSchemaEvolutionEnabled().toString()); + config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "true"); createIcebergTable(); enableSchemaEvolution(tableName);