From ef46759d144ce8719c6b5e839673b741f712e2f8 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Tue, 19 Nov 2024 16:47:37 -0800 Subject: [PATCH] done --- ...nowflakeStreamingIngestClientInternal.java | 17 +- .../internal/it/SubscopedTokenRefreshIT.java | 149 ++++++++++++++++++ 2 files changed, 165 insertions(+), 1 deletion(-) create mode 100644 src/test/java/net/snowflake/ingest/streaming/internal/it/SubscopedTokenRefreshIT.java diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 4bbcc9bc9..6efa6fbfc 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -108,7 +108,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea private final ChannelCache channelCache; // Reference to the flush service - private final FlushService flushService; + private FlushService flushService; // Reference to storage manager private IStorageManager storageManager; @@ -293,6 +293,21 @@ public SnowflakeStreamingIngestClientInternal( public void injectRequestBuilder(RequestBuilder requestBuilder) { this.requestBuilder = requestBuilder; this.snowflakeServiceClient = new SnowflakeServiceClient(this.httpClient, this.requestBuilder); + this.storageManager = + parameterProvider.isEnableIcebergStreaming() + ? new SubscopedTokenExternalVolumeManager( + this.role, this.name, this.snowflakeServiceClient) + : new InternalStageManager( + isTestMode, this.role, this.name, this.snowflakeServiceClient); + + try { + this.flushService = + new FlushService<>(this, this.channelCache, this.storageManager, this.isTestMode); + } catch (Exception e) { + // Need to clean up the resources before throwing any exceptions + cleanUpResources(); + throw e; + } } /** diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/SubscopedTokenRefreshIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/SubscopedTokenRefreshIT.java new file mode 100644 index 000000000..432fb0ff5 --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/SubscopedTokenRefreshIT.java @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal.it; + +import static net.snowflake.ingest.utils.Constants.IcebergSerializationPolicy.OPTIMIZED; +import static net.snowflake.ingest.utils.Constants.REFRESH_TABLE_INFORMATION_ENDPOINT; +import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING; + +import com.google.common.collect.ImmutableMap; +import java.sql.Connection; +import java.util.Properties; +import net.snowflake.ingest.IcebergIT; +import net.snowflake.ingest.TestUtils; +import net.snowflake.ingest.connection.RequestBuilder; +import net.snowflake.ingest.streaming.OpenChannelRequest; +import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; +import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal; +import net.snowflake.ingest.utils.Constants; +import net.snowflake.ingest.utils.HttpUtil; +import net.snowflake.ingest.utils.SnowflakeURL; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category(IcebergIT.class) +public class SubscopedTokenRefreshIT { + + private String database; + private String schema; + private Connection conn; + private SnowflakeStreamingIngestClientInternal client; + private RequestBuilder requestBuilder; + + @Before + public void before() throws Exception { + database = String.format("SDK_TOKEN_EXPIRE_IT_DB_%d", System.nanoTime()); + schema = "PUBLIC"; + + conn = TestUtils.getConnection(true); + + SnowflakeURL url = new SnowflakeURL(TestUtils.getAccountURL()); + Properties properties = TestUtils.getProperties(Constants.BdecVersion.THREE, false); + properties.setProperty(ENABLE_ICEBERG_STREAMING, "true"); + requestBuilder = + Mockito.spy( + new RequestBuilder( + url, + TestUtils.getUser(), + TestUtils.getKeyPair(), + HttpUtil.getHttpClient(url.getAccount()), + true /* enableIcebergStreaming */, + "client1")); + this.client = + (SnowflakeStreamingIngestClientInternal) + TestUtils.setUp( + conn, database, schema, true /* enableIcebergStreaming */, "ZSTD", OPTIMIZED); + client.injectRequestBuilder(requestBuilder); + } + + @After + public void after() throws Exception { + conn.createStatement().execute(String.format("drop database if exists %s;", database)); + } + + @Test + public void testTokenExpire() throws Exception { + String tableName = "test_token_expire_table"; + + /* + * Minimum duration of token for each cloud storage: + * - S3: 900 seconds + * - GCS: 600 seconds + * - Azure: 300 seconds + */ + int duration = 900; + createIcebergTable(tableName); + conn.createStatement() + .execute( + String.format( + "alter iceberg table %s set" + + " STREAMING_ICEBERG_INGESTION_SUBSCOPED_TOKEN_DURATION_SECONDS_S3=%s", + tableName, duration)); + conn.createStatement() + .execute( + String.format( + "alter iceberg table %s set" + + " STREAMING_ICEBERG_INGESTION_SUBSCOPED_TOKEN_DURATION_SECONDS_GCS=%s", + tableName, duration)); + conn.createStatement() + .execute( + String.format( + "alter iceberg table %s set" + + " STREAMING_ICEBERG_INGESTION_SUBSCOPED_TOKEN_DURATION_SECONDS_AZURE=%s", + tableName, duration)); + conn.createStatement() + .execute( + String.format( + "alter iceberg table %s set" + + " STREAMING_ICEBERG_INGESTION_SUBSCOPED_TOKEN_DURATION_SECONDS_DEFAULT=%s", + tableName, duration)); + + SnowflakeStreamingIngestChannel channel = + client.openChannel( + OpenChannelRequest.builder("CHANNEL") + .setDBName(database) + .setSchemaName(schema) + .setTableName(tableName) + .setOnErrorOption(OpenChannelRequest.OnErrorOption.ABORT) + .build()); + + /* Refresh table information should be called once channel is opened */ + Mockito.verify(requestBuilder, Mockito.times(1)) + .generateStreamingIngestPostRequest( + Mockito.anyString(), + Mockito.eq(REFRESH_TABLE_INFORMATION_ENDPOINT), + Mockito.eq("refresh table information")); + + channel.insertRow(ImmutableMap.of("int_col", 1), "1"); + TestUtils.waitForOffset(channel, "1"); + + /* Wait for token to expire */ + + Thread.sleep((duration + 1) * 1000); + + /* Insert a row to trigger token generation */ + channel.insertRow(ImmutableMap.of("int_col", 2), "2"); + TestUtils.waitForOffset(channel, "2"); + Mockito.verify(requestBuilder, Mockito.times(2)) + .generateStreamingIngestPostRequest( + Mockito.anyString(), + Mockito.eq(REFRESH_TABLE_INFORMATION_ENDPOINT), + Mockito.eq("refresh table information")); + } + + private void createIcebergTable(String tableName) throws Exception { + conn.createStatement() + .execute( + String.format( + "create or replace iceberg table %s(int_col int)" + + "catalog = 'SNOWFLAKE' " + + "external_volume = 'streaming_ingest' " + + "base_location = 'SDK_IT/%s/%s'", + tableName, database, tableName)); + } +}