Skip to content

Commit

Permalink
SNOW-949967 Add an optional offset token parameter for openChannel (#645
Browse files Browse the repository at this point in the history
)
  • Loading branch information
sfc-gh-asen authored Oct 25, 2023
1 parent c565025 commit 4450fc4
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public enum OnErrorOption {
// Default timezone for TIMESTAMP_LTZ and TIMESTAMP_TZ columns
private final ZoneId defaultTimezone;

private final String offsetToken;
private final boolean isOffsetTokenProvided;

public static OpenChannelRequestBuilder builder(String channelName) {
return new OpenChannelRequestBuilder(channelName);
}
Expand All @@ -53,6 +56,9 @@ public static class OpenChannelRequestBuilder {
private OnErrorOption onErrorOption;
private ZoneId defaultTimezone;

private String offsetToken;
private boolean isOffsetTokenProvided = false;

public OpenChannelRequestBuilder(String channelName) {
this.channelName = channelName;
this.defaultTimezone = DEFAULT_DEFAULT_TIMEZONE;
Expand Down Expand Up @@ -83,6 +89,12 @@ public OpenChannelRequestBuilder setDefaultTimezone(ZoneId defaultTimezone) {
return this;
}

public OpenChannelRequestBuilder setOffsetToken(String offsetToken){
this.offsetToken = offsetToken;
this.isOffsetTokenProvided = true;
return this;
}

public OpenChannelRequest build() {
return new OpenChannelRequest(this);
}
Expand All @@ -102,6 +114,8 @@ private OpenChannelRequest(OpenChannelRequestBuilder builder) {
this.tableName = builder.tableName;
this.onErrorOption = builder.onErrorOption;
this.defaultTimezone = builder.defaultTimezone;
this.offsetToken = builder.offsetToken;
this.isOffsetTokenProvided = builder.isOffsetTokenProvided;
}

public String getDBName() {
Expand Down Expand Up @@ -131,4 +145,12 @@ public String getFullyQualifiedTableName() {
public OnErrorOption getOnErrorOption() {
return this.onErrorOption;
}

public String getOffsetToken() {
return this.offsetToken;
}

public boolean isOffsetTokenProvided() {
return this.isOffsetTokenProvided;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest
payload.put("schema", request.getSchemaName());
payload.put("write_mode", Constants.WriteMode.CLOUD_STORAGE.name());
payload.put("role", this.role);
if (request.isOffsetTokenProvided()){
payload.put("offset_token", request.getOffsetToken());
}

OpenChannelResponse response =
executeWithRetries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,25 @@ public void testOpenChannelRequestCreationSuccess() {

Assert.assertEquals(
"STREAMINGINGEST_TEST.PUBLIC.T_STREAMINGINGEST", request.getFullyQualifiedTableName());
Assert.assertFalse(request.isOffsetTokenProvided());
}


@Test
public void testOpenChannelRequesCreationtWithOffsetToken() {
OpenChannelRequest request =
OpenChannelRequest.builder("CHANNEL")
.setDBName("STREAMINGINGEST_TEST")
.setSchemaName("PUBLIC")
.setTableName("T_STREAMINGINGEST")
.setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE)
.setOffsetToken("TEST_TOKEN")
.build();

Assert.assertEquals(
"STREAMINGINGEST_TEST.PUBLIC.T_STREAMINGINGEST", request.getFullyQualifiedTableName());
Assert.assertEquals("TEST_TOKEN", request.getOffsetToken());
Assert.assertTrue(request.isOffsetTokenProvided());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,41 @@ public void testMultiColumnIngest() throws Exception {
Assert.fail("Row sequencer not updated before timeout");
}

@Test
public void testOpenChannelOffsetToken() throws Exception {
String tableName = "offsetTokenTest";
jdbcConnection
.createStatement()
.execute(
String.format(
"create or replace table %s (s text);",
tableName));
OpenChannelRequest request1 =
OpenChannelRequest.builder("TEST_CHANNEL")
.setDBName(testDb)
.setSchemaName(TEST_SCHEMA)
.setTableName(tableName)
.setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE)
.setOffsetToken("TEST_OFFSET")
.build();

// Open a streaming ingest channel from the given client
SnowflakeStreamingIngestChannel channel1 = client.openChannel(request1);

// Close the channel after insertion
channel1.close().get();

for (int i = 1; i < 15; i++) {
if (channel1.getLatestCommittedOffsetToken() != null
&& channel1.getLatestCommittedOffsetToken().equals("TEST_OFFSET")) {
return;
} else {
Thread.sleep(2000);
}
}
Assert.fail("Row sequencer not updated before timeout");
}

@Test
public void testNullableColumns() throws Exception {
String multiTableName = "multi_column";
Expand Down

0 comments on commit 4450fc4

Please sign in to comment.