Skip to content

Commit

Permalink
SNOW-1728002 get rid of reflection when enabling iceberg streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bzabek committed Nov 13, 2024
1 parent d6ed5e1 commit ff4f353
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>2.3.0</version>
<version>3.0.0</version>
<exclusions>
<exclusion>
<groupId>net.snowflake</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.kafka.connect.errors.ConnectException;

/** This class handles all calls to manage the streaming ingestion client */
Expand All @@ -49,8 +48,6 @@ public SnowflakeStreamingIngestClient createClient(
.setProperties(streamingClientProperties.clientProperties)
.setParameterOverrides(streamingClientProperties.parameterOverrides);

setIcebergEnabled(builder, streamingClientProperties.isIcebergEnabled);

SnowflakeStreamingIngestClient createdClient = builder.build();

LOGGER.info(
Expand All @@ -65,17 +62,6 @@ public SnowflakeStreamingIngestClient createClient(
}
}

private static void setIcebergEnabled(
SnowflakeStreamingIngestClientFactory.Builder builder, boolean isIcebergEnabled) {
try {
// TODO reflection should be replaced by proper builder.setIceberg(true) call in SNOW-1728002
FieldUtils.writeField(builder, "isIceberg", isIcebergEnabled, true);
} catch (IllegalAccessException e) {
throw new IllegalStateException(
"Couldn't set iceberg by accessing private field: " + "isIceberg", e);
}
}

/**
* Closes the given client. Swallows any exceptions
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES;
import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING;
import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.ParameterProvider.MAX_CLIENT_LAG;
import static net.snowflake.ingest.utils.ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES;
Expand Down Expand Up @@ -90,6 +91,8 @@ public StreamingClientProperties(Map<String, String> connectorConfig) {

// Override only if the streaming client properties are explicitly set in config
this.parameterOverrides = new HashMap<>();
parameterOverrides.put(ENABLE_ICEBERG_STREAMING, isIcebergEnabled);

Optional<String> snowpipeStreamingMaxClientLag =
Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_MAX_CLIENT_LAG));
snowpipeStreamingMaxClientLag.ifPresent(
Expand Down

0 comments on commit ff4f353

Please sign in to comment.