Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Nov 20, 2024
1 parent d916a2f commit ecb4f51
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
* @param requestBuilder http request builder
* @param parameterOverrides parameters we override in case we want to set different values
*/
SnowflakeStreamingIngestClientInternal(
@VisibleForTesting
public SnowflakeStreamingIngestClientInternal(
String name,
SnowflakeURL accountURL,
Properties prop,
Expand Down Expand Up @@ -219,14 +220,16 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
prop.getProperty(Constants.OAUTH_REFRESH_TOKEN),
oAuthTokenEndpoint);
}
this.requestBuilder =
new RequestBuilder(
accountURL,
prop.get(USER).toString(),
credential,
this.httpClient,
parameterProvider.isEnableIcebergStreaming(),
String.format("%s_%s", this.name, System.currentTimeMillis()));
if (this.requestBuilder == null) {
this.requestBuilder =
new RequestBuilder(
accountURL,
prop.get(USER).toString(),
credential,
this.httpClient,
parameterProvider.isEnableIcebergStreaming(),
String.format("%s_%s", this.name, System.currentTimeMillis()));
}

logger.logInfo("Using {} for authorization", this.requestBuilder.getAuthType());
}
Expand Down Expand Up @@ -293,21 +296,6 @@ public SnowflakeStreamingIngestClientInternal(
public void injectRequestBuilder(RequestBuilder requestBuilder) {
this.requestBuilder = requestBuilder;
this.snowflakeServiceClient = new SnowflakeServiceClient(this.httpClient, this.requestBuilder);
this.storageManager =
parameterProvider.isEnableIcebergStreaming()
? new SubscopedTokenExternalVolumeManager(
this.role, this.name, this.snowflakeServiceClient)
: new InternalStageManager(
isTestMode, this.role, this.name, this.snowflakeServiceClient);

try {
this.flushService =
new FlushService<>(this, this.channelCache, this.storageManager, this.isTestMode);
} catch (Exception e) {
// Need to clean up the resources before throwing any exceptions
cleanUpResources();
throw e;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

package net.snowflake.ingest.streaming.internal.it;

import static net.snowflake.ingest.utils.Constants.IcebergSerializationPolicy.OPTIMIZED;
import static net.snowflake.ingest.utils.Constants.REFRESH_TABLE_INFORMATION_ENDPOINT;
import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING;
import static net.snowflake.ingest.utils.ParameterProvider.MAX_CLIENT_LAG;

import com.google.common.collect.ImmutableMap;
import java.sql.Connection;
Expand All @@ -20,6 +20,7 @@
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.HttpUtil;
import net.snowflake.ingest.utils.SnowflakeURL;
import net.snowflake.ingest.utils.Utils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -42,9 +43,15 @@ public void before() throws Exception {

conn = TestUtils.getConnection(true);

conn.createStatement().execute(String.format("create or replace database %s;", database));
conn.createStatement().execute(String.format("use database %s;", database));
conn.createStatement().execute(String.format("use schema %s;", schema));
conn.createStatement().execute(String.format("use warehouse %s;", TestUtils.getWarehouse()));

SnowflakeURL url = new SnowflakeURL(TestUtils.getAccountURL());
Properties properties = TestUtils.getProperties(Constants.BdecVersion.THREE, false);
properties.setProperty(ENABLE_ICEBERG_STREAMING, "true");
properties.setProperty(MAX_CLIENT_LAG, "1000");
requestBuilder =
Mockito.spy(
new RequestBuilder(
Expand All @@ -55,10 +62,14 @@ public void before() throws Exception {
true /* enableIcebergStreaming */,
"client1"));
this.client =
(SnowflakeStreamingIngestClientInternal<?>)
TestUtils.setUp(
conn, database, schema, true /* enableIcebergStreaming */, "ZSTD", OPTIMIZED);
client.injectRequestBuilder(requestBuilder);
new SnowflakeStreamingIngestClientInternal<>(
"client1",
url,
Utils.createProperties(properties),
HttpUtil.getHttpClient(url.getAccount()),
false /* isTestMode */,
requestBuilder,
null /* parameterOverrides */);
}

@After
Expand Down

0 comments on commit ecb4f51

Please sign in to comment.