From ecb4f513e7c38423ceb99a8052c2b988e12e7cd0 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Wed, 20 Nov 2024 02:25:04 -0800 Subject: [PATCH] fix test --- ...nowflakeStreamingIngestClientInternal.java | 36 +++++++------------ .../internal/it/SubscopedTokenRefreshIT.java | 21 ++++++++--- 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 6efa6fbfc..f10f683c3 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -157,7 +157,8 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param requestBuilder http request builder * @param parameterOverrides parameters we override in case we want to set different values */ - SnowflakeStreamingIngestClientInternal( + @VisibleForTesting + public SnowflakeStreamingIngestClientInternal( String name, SnowflakeURL accountURL, Properties prop, @@ -219,14 +220,16 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea prop.getProperty(Constants.OAUTH_REFRESH_TOKEN), oAuthTokenEndpoint); } - this.requestBuilder = - new RequestBuilder( - accountURL, - prop.get(USER).toString(), - credential, - this.httpClient, - parameterProvider.isEnableIcebergStreaming(), - String.format("%s_%s", this.name, System.currentTimeMillis())); + if (this.requestBuilder == null) { + this.requestBuilder = + new RequestBuilder( + accountURL, + prop.get(USER).toString(), + credential, + this.httpClient, + parameterProvider.isEnableIcebergStreaming(), + String.format("%s_%s", this.name, System.currentTimeMillis())); + } logger.logInfo("Using {} for authorization", this.requestBuilder.getAuthType()); } @@ -293,21 +296,6 @@ public SnowflakeStreamingIngestClientInternal( public void injectRequestBuilder(RequestBuilder requestBuilder) { this.requestBuilder = requestBuilder; this.snowflakeServiceClient = new SnowflakeServiceClient(this.httpClient, this.requestBuilder); - this.storageManager = - parameterProvider.isEnableIcebergStreaming() - ? new SubscopedTokenExternalVolumeManager( - this.role, this.name, this.snowflakeServiceClient) - : new InternalStageManager( - isTestMode, this.role, this.name, this.snowflakeServiceClient); - - try { - this.flushService = - new FlushService<>(this, this.channelCache, this.storageManager, this.isTestMode); - } catch (Exception e) { - // Need to clean up the resources before throwing any exceptions - cleanUpResources(); - throw e; - } } /** diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/SubscopedTokenRefreshIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/SubscopedTokenRefreshIT.java index 432fb0ff5..7b9319985 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/it/SubscopedTokenRefreshIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/SubscopedTokenRefreshIT.java @@ -4,9 +4,9 @@ package net.snowflake.ingest.streaming.internal.it; -import static net.snowflake.ingest.utils.Constants.IcebergSerializationPolicy.OPTIMIZED; import static net.snowflake.ingest.utils.Constants.REFRESH_TABLE_INFORMATION_ENDPOINT; import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING; +import static net.snowflake.ingest.utils.ParameterProvider.MAX_CLIENT_LAG; import com.google.common.collect.ImmutableMap; import java.sql.Connection; @@ -20,6 +20,7 @@ import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.HttpUtil; import net.snowflake.ingest.utils.SnowflakeURL; +import net.snowflake.ingest.utils.Utils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -42,9 +43,15 @@ public void before() throws Exception { conn = TestUtils.getConnection(true); + conn.createStatement().execute(String.format("create or replace database %s;", database)); + conn.createStatement().execute(String.format("use database %s;", database)); + conn.createStatement().execute(String.format("use schema %s;", schema)); + conn.createStatement().execute(String.format("use warehouse %s;", TestUtils.getWarehouse())); + SnowflakeURL url = new SnowflakeURL(TestUtils.getAccountURL()); Properties properties = TestUtils.getProperties(Constants.BdecVersion.THREE, false); properties.setProperty(ENABLE_ICEBERG_STREAMING, "true"); + properties.setProperty(MAX_CLIENT_LAG, "1000"); requestBuilder = Mockito.spy( new RequestBuilder( @@ -55,10 +62,14 @@ public void before() throws Exception { true /* enableIcebergStreaming */, "client1")); this.client = - (SnowflakeStreamingIngestClientInternal) - TestUtils.setUp( - conn, database, schema, true /* enableIcebergStreaming */, "ZSTD", OPTIMIZED); - client.injectRequestBuilder(requestBuilder); + new SnowflakeStreamingIngestClientInternal<>( + "client1", + url, + Utils.createProperties(properties), + HttpUtil.getHttpClient(url.getAccount()), + false /* isTestMode */, + requestBuilder, + null /* parameterOverrides */); } @After