From 4450fc45fc99a5fb830598fa7f7eee705fa00cf5 Mon Sep 17 00:00:00 2001 From: Alkin Sen <120425561+sfc-gh-asen@users.noreply.github.com> Date: Wed, 25 Oct 2023 16:01:36 -0700 Subject: [PATCH] SNOW-949967 Add an optional offset token parameter for openChannel (#645) --- .../ingest/streaming/OpenChannelRequest.java | 22 ++++++++++++ ...nowflakeStreamingIngestClientInternal.java | 3 ++ .../SnowflakeStreamingIngestChannelTest.java | 19 ++++++++++ .../streaming/internal/StreamingIngestIT.java | 35 +++++++++++++++++++ 4 files changed, 79 insertions(+) diff --git a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java index f48288330..d30472b63 100644 --- a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java @@ -40,6 +40,9 @@ public enum OnErrorOption { // Default timezone for TIMESTAMP_LTZ and TIMESTAMP_TZ columns private final ZoneId defaultTimezone; + private final String offsetToken; + private final boolean isOffsetTokenProvided; + public static OpenChannelRequestBuilder builder(String channelName) { return new OpenChannelRequestBuilder(channelName); } @@ -53,6 +56,9 @@ public static class OpenChannelRequestBuilder { private OnErrorOption onErrorOption; private ZoneId defaultTimezone; + private String offsetToken; + private boolean isOffsetTokenProvided = false; + public OpenChannelRequestBuilder(String channelName) { this.channelName = channelName; this.defaultTimezone = DEFAULT_DEFAULT_TIMEZONE; @@ -83,6 +89,12 @@ public OpenChannelRequestBuilder setDefaultTimezone(ZoneId defaultTimezone) { return this; } + public OpenChannelRequestBuilder setOffsetToken(String offsetToken){ + this.offsetToken = offsetToken; + this.isOffsetTokenProvided = true; + return this; + } + public OpenChannelRequest build() { return new OpenChannelRequest(this); } @@ -102,6 +114,8 @@ private OpenChannelRequest(OpenChannelRequestBuilder builder) { this.tableName = builder.tableName; this.onErrorOption = builder.onErrorOption; this.defaultTimezone = builder.defaultTimezone; + this.offsetToken = builder.offsetToken; + this.isOffsetTokenProvided = builder.isOffsetTokenProvided; } public String getDBName() { @@ -131,4 +145,12 @@ public String getFullyQualifiedTableName() { public OnErrorOption getOnErrorOption() { return this.onErrorOption; } + + public String getOffsetToken() { + return this.offsetToken; + } + + public boolean isOffsetTokenProvided() { + return this.isOffsetTokenProvided; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 27ff9407e..a52149331 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -304,6 +304,9 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest payload.put("schema", request.getSchemaName()); payload.put("write_mode", Constants.WriteMode.CLOUD_STORAGE.name()); payload.put("role", this.role); + if (request.isOffsetTokenProvided()){ + payload.put("offset_token", request.getOffsetToken()); + } OpenChannelResponse response = executeWithRetries( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index a1e26b63d..037028086 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -263,6 +263,25 @@ public void testOpenChannelRequestCreationSuccess() { Assert.assertEquals( "STREAMINGINGEST_TEST.PUBLIC.T_STREAMINGINGEST", request.getFullyQualifiedTableName()); + Assert.assertFalse(request.isOffsetTokenProvided()); + } + + + @Test + public void testOpenChannelRequesCreationtWithOffsetToken() { + OpenChannelRequest request = + OpenChannelRequest.builder("CHANNEL") + .setDBName("STREAMINGINGEST_TEST") + .setSchemaName("PUBLIC") + .setTableName("T_STREAMINGINGEST") + .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE) + .setOffsetToken("TEST_TOKEN") + .build(); + + Assert.assertEquals( + "STREAMINGINGEST_TEST.PUBLIC.T_STREAMINGINGEST", request.getFullyQualifiedTableName()); + Assert.assertEquals("TEST_TOKEN", request.getOffsetToken()); + Assert.assertTrue(request.isOffsetTokenProvided()); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java index 95cc5c699..eadbe81ee 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java @@ -532,6 +532,41 @@ public void testMultiColumnIngest() throws Exception { Assert.fail("Row sequencer not updated before timeout"); } + @Test + public void testOpenChannelOffsetToken() throws Exception { + String tableName = "offsetTokenTest"; + jdbcConnection + .createStatement() + .execute( + String.format( + "create or replace table %s (s text);", + tableName)); + OpenChannelRequest request1 = + OpenChannelRequest.builder("TEST_CHANNEL") + .setDBName(testDb) + .setSchemaName(TEST_SCHEMA) + .setTableName(tableName) + .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE) + .setOffsetToken("TEST_OFFSET") + .build(); + + // Open a streaming ingest channel from the given client + SnowflakeStreamingIngestChannel channel1 = client.openChannel(request1); + + // Close the channel after insertion + channel1.close().get(); + + for (int i = 1; i < 15; i++) { + if (channel1.getLatestCommittedOffsetToken() != null + && channel1.getLatestCommittedOffsetToken().equals("TEST_OFFSET")) { + return; + } else { + Thread.sleep(2000); + } + } + Assert.fail("Row sequencer not updated before timeout"); + } + @Test public void testNullableColumns() throws Exception { String multiTableName = "multi_column";