Skip to content

Commit

Permalink
fix SnowflakeServiceClient test code to allow per-API overrides
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-hmadan committed Sep 23, 2024
1 parent 3f9b959 commit a2d4c6b
Show file tree
Hide file tree
Showing 9 changed files with 337 additions and 492 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
String.format("%s_%s", this.name, System.currentTimeMillis()));

logger.logInfo("Using {} for authorization", this.requestBuilder.getAuthType());
}

if (this.requestBuilder != null) {
// Setup client telemetries if needed
this.setupMetricsForClient();
}
Expand Down
4 changes: 3 additions & 1 deletion src/test/java/net/snowflake/ingest/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.security.KeyFactory;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.PKCS8EncodedKeySpec;
import java.sql.Connection;
import java.sql.DriverManager;
Expand Down Expand Up @@ -102,7 +104,7 @@ public class TestUtils {
*
* @throws IOException if can't read profile
*/
private static void init() throws Exception {
private static void init() throws NoSuchAlgorithmException, InvalidKeySpecException, IOException {
String testProfilePath = getTestProfilePath();
Path path = Paths.get(testProfilePath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

import static java.time.ZoneOffset.UTC;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -39,7 +42,12 @@ public static Object[] isIcebergMode() {
@Before
public void setup() {
cache = new ChannelCache<>();
client = new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode);
CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient();
RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient);
client =
new SnowflakeStreamingIngestClientInternal<>(
"client", null, null, httpClient, isIcebergMode, true, requestBuilder, new HashMap<>());

channel1 =
new SnowflakeStreamingIngestChannelInternal<>(
"channel1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@ private abstract static class TestContext<T> implements AutoCloseable {
FlushService<T> flushService;
IStorageManager storageManager;
InternalStage storage;
ExternalVolume extVolume;
ParameterProvider parameterProvider;
RegisterService registerService;

final List<ChannelData<T>> channelData = new ArrayList<>();

TestContext() {
storage = Mockito.mock(InternalStage.class);
extVolume = Mockito.mock(ExternalVolume.class);
parameterProvider = new ParameterProvider(isIcebergMode);
InternalParameterProvider internalParameterProvider =
new InternalParameterProvider(isIcebergMode);
Expand All @@ -113,9 +115,12 @@ private abstract static class TestContext<T> implements AutoCloseable {
storageManager =
Mockito.spy(
isIcebergMode
? new ExternalVolumeManager(true, "role", "client", null)
? new ExternalVolumeManager(
true, "role", "client", MockSnowflakeServiceClient.create())
: new InternalStageManager(true, "role", "client", null));
Mockito.doReturn(storage).when(storageManager).getStorage(ArgumentMatchers.any());
Mockito.doReturn(isIcebergMode ? extVolume : storage)
.when(storageManager)
.getStorage(ArgumentMatchers.any());
Mockito.when(storageManager.getClientPrefix()).thenReturn("client_prefix");
Mockito.when(client.getParameterProvider())
.thenAnswer((Answer<ParameterProvider>) (i) -> parameterProvider);
Expand Down Expand Up @@ -425,6 +430,7 @@ private static ColumnMetadata createLargeTestTextColumn(String name) {

@Test
public void testGetFilePath() {
// SNOW-1490151 Iceberg testing gaps
if (isIcebergMode) {
// TODO: SNOW-1502887 Blob path generation for iceberg table
return;
Expand Down Expand Up @@ -623,6 +629,7 @@ public void testBlobCreation() throws Exception {
FlushService<?> flushService = testContext.flushService;

// Force = true flushes
// SNOW-1490151 Iceberg testing gaps
if (!isIcebergMode) {
flushService.flush(true).get();
Mockito.verify(flushService, Mockito.atLeast(2))
Expand Down Expand Up @@ -674,6 +681,7 @@ public void testBlobSplitDueToDifferentSchema() throws Exception {

FlushService<?> flushService = testContext.flushService;

// SNOW-1490151 Iceberg testing gaps
if (!isIcebergMode) {
// Force = true flushes
flushService.flush(true).get();
Expand Down Expand Up @@ -711,6 +719,7 @@ public void testBlobSplitDueToChunkSizeLimit() throws Exception {

FlushService<?> flushService = testContext.flushService;

// SNOW-1490151 Iceberg testing gaps
if (!isIcebergMode) {
// Force = true flushes
flushService.flush(true).get();
Expand All @@ -721,6 +730,7 @@ public void testBlobSplitDueToChunkSizeLimit() throws Exception {

@Test
public void testBlobSplitDueToNumberOfChunks() throws Exception {
// SNOW-1490151 Iceberg testing gaps
if (isIcebergMode) {
return;
}
Expand Down Expand Up @@ -799,6 +809,7 @@ public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Except
channel3.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1")));
channel3.insertRow(Collections.singletonMap("C1", 0), "");

// SNOW-1490151 Iceberg testing gaps
if (isIcebergMode) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Utils;
Expand Down Expand Up @@ -50,9 +52,19 @@ public static Object[] isIcebergMode() {
@Setup(Level.Trial)
public void setUpBeforeAll() {
// SNOW-1490151: Testing gaps
CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient();
RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient);
client =
new SnowflakeStreamingIngestClientInternal<ParquetChunkData>(
"client_PARQUET", isIcebergMode);
new SnowflakeStreamingIngestClientInternal<>(
"client_PARQUET",
null,
null,
httpClient,
isIcebergMode,
true,
requestBuilder,
new HashMap<>());

channel =
new SnowflakeStreamingIngestChannelInternal<>(
"channel",
Expand Down
Loading

0 comments on commit a2d4c6b

Please sign in to comment.