Skip to content

Commit

Permalink
Add FlushServiceTest in IcebergMode
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Jun 12, 2024
1 parent 61abbb5 commit 8d2870b
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,31 +137,43 @@ List<List<ChannelData<T>>> getData() {
private String clientPrefix;

/**
* Constructor for TESTING that takes (usually mocked) StreamingIngestStage
* Constructor for TESTING that takes (usually mocked) StreamingIngestStage or external volume map
*
* @param client
* @param cache
* @param targetStageOrExtervalVolumeMap
* @param isIcebergMode
* @param isTestMode
*/
FlushService(
SnowflakeStreamingIngestClientInternal<T> client,
ChannelCache<T> cache,
StreamingIngestStage targetStage, // For testing
Object targetStageOrExtervalVolumeMap, // For testing
boolean isIcebergMode,
boolean isTestMode) {
this.owningClient = client;
this.channelCache = cache;
this.targetStage = targetStage;
if (this.targetStage != null) {
this.clientPrefix = this.targetStage.getClientPrefix();
if (!isIcebergMode) {
this.targetStage = (StreamingIngestStage) targetStageOrExtervalVolumeMap;
this.externalVolumeMap = null;
if (this.targetStage != null) {
this.clientPrefix = this.targetStage.getClientPrefix();
}
} else {
this.targetStage = null;
this.externalVolumeMap =
(Map<String, StreamingIngestExternalVolume>) targetStageOrExtervalVolumeMap;
if (externalVolumeMap != null && !externalVolumeMap.isEmpty()) {
this.clientPrefix =
externalVolumeMap.entrySet().iterator().next().getValue().getClientPrefix();
}
}
this.counter = new AtomicLong(0);
this.registerService = new RegisterService<>(client, isTestMode);
this.isNeedFlush = false;
this.lastFlushTime = System.currentTimeMillis();
this.isIcebergMode = isIcebergMode;
this.isTestMode = isTestMode;
this.externalVolumeMap = isIcebergMode ? new ConcurrentHashMap<>() : null;
this.latencyTimerContextMap = new ConcurrentHashMap<>();
this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion();
createWorkers();
Expand Down Expand Up @@ -724,7 +736,7 @@ void setNeedFlush() {
* thread id + counter>.BDEC". Blob path of an external volume is: "snow_<volume hash + current
* utc timestamp + client unique prefix + thread id + counter>.parquet".
*
* @param volumeHash volume hash of an external volume, only works when isIcebergMode is true
* @param volumeHash volume hash of an external volume, only matters when isIcebergMode is true
* @return the generated blob file path
*/
private String getBlobPath(String volumeHash) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.utils.Constants.BLOB_CHECKSUM_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.Constants.BLOB_CHUNK_METADATA_LENGTH_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.Constants.BLOB_EXTENSION_TYPE;
import static net.snowflake.ingest.utils.Constants.BLOB_FILE_SIZE_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.Constants.BLOB_NO_HEADER;
import static net.snowflake.ingest.utils.Constants.BLOB_TAG_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.Constants.BLOB_VERSION_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.Constants.*;
import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNK_SIZE_IN_BYTES_DEFAULT;

import com.codahale.metrics.Histogram;
Expand All @@ -25,17 +19,7 @@
import java.security.NoSuchAlgorithmException;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.TimeUnit;
import javax.crypto.BadPaddingException;
import javax.crypto.IllegalBlockSizeException;
Expand All @@ -57,10 +41,9 @@
@RunWith(Parameterized.class)
public class FlushServiceTest {

// TODO: Add IcebergMode = True
@Parameterized.Parameters(name = "isIcebergMode: {0}")
public static Object[] isIcebergMode() {
return new Object[] {false};
return new Object[] {false, true};
}

@Parameterized.Parameter public static boolean isIcebergMode;
Expand All @@ -85,9 +68,14 @@ public String toString() {
}

private abstract static class TestContext<T> implements AutoCloseable {
static final String clientPrefix = "client_prefix";
static final String volumeHash = "volume_hash";

SnowflakeStreamingIngestClientInternal<T> client;
ChannelCache<T> channelCache;
final Map<String, SnowflakeStreamingIngestChannelInternal<T>> channels = new HashMap<>();
StreamingIngestExternalVolume externalVolume;
Map<String, StreamingIngestExternalVolume> externalVolumeMap;
FlushService<T> flushService;
StreamingIngestStage stage;
ParameterProvider parameterProvider;
Expand All @@ -97,15 +85,28 @@ private abstract static class TestContext<T> implements AutoCloseable {

TestContext(boolean isIcebergMode) {
stage = Mockito.mock(StreamingIngestStage.class);
Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix");
Mockito.when(stage.getClientPrefix()).thenReturn(clientPrefix);
externalVolume = Mockito.mock(StreamingIngestExternalVolume.class);
Mockito.when(externalVolume.getClientPrefix()).thenReturn(clientPrefix);
externalVolumeMap = Mockito.mock(Map.class);
Mockito.when(externalVolumeMap.get(Mockito.any(String.class))).thenReturn(externalVolume);
Mockito.doReturn(Collections.singleton(new AbstractMap.SimpleEntry<>("_", externalVolume)))
.when(externalVolumeMap)
.entrySet();
parameterProvider = new ParameterProvider(isIcebergMode);
client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class);
Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider);
channelCache = new ChannelCache<>();
Mockito.when(client.getChannelCache()).thenReturn(channelCache);
registerService = Mockito.spy(new RegisterService(client, client.isTestMode()));
flushService =
Mockito.spy(new FlushService<>(client, channelCache, stage, isIcebergMode, true));
Mockito.spy(
new FlushService<>(
client,
channelCache,
isIcebergMode ? externalVolumeMap : stage,
isIcebergMode,
true));
}

ChannelData<T> flushChannel(String name) {
Expand Down Expand Up @@ -408,32 +409,40 @@ public void testGetFilePath() {
TestContext<?> testContext = testContextFactory.create(isIcebergMode);
FlushService<?> flushService = testContext.flushService;
Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
String outputString = flushService.getBlobPath(calendar, null);
String outputString = flushService.getBlobPath(calendar, TestContext.volumeHash);
Path outputPath = Paths.get(outputString);
Assert.assertTrue(outputPath.getFileName().toString().contains("client_prefix"));
Assert.assertTrue(
calendar.get(Calendar.MINUTE)
- Integer.parseInt(outputPath.getParent().getFileName().toString())
<= 1);
Assert.assertEquals(
Integer.toString(calendar.get(Calendar.HOUR_OF_DAY)),
outputPath.getParent().getParent().getFileName().toString());
Assert.assertEquals(
Integer.toString(calendar.get(Calendar.DAY_OF_MONTH)),
outputPath.getParent().getParent().getParent().getFileName().toString());
Assert.assertEquals(
Integer.toString(calendar.get(Calendar.MONTH) + 1),
outputPath.getParent().getParent().getParent().getParent().getFileName().toString());
Assert.assertEquals(
Integer.toString(calendar.get(Calendar.YEAR)),
outputPath
.getParent()
.getParent()
.getParent()
.getParent()
.getParent()
.getFileName()
.toString());
Assert.assertTrue(outputPath.getFileName().toString().contains(TestContext.clientPrefix));
if (!isIcebergMode) {
Assert.assertTrue(
calendar.get(Calendar.MINUTE)
- Integer.parseInt(outputPath.getParent().getFileName().toString())
<= 1);
Assert.assertEquals(
Integer.toString(calendar.get(Calendar.HOUR_OF_DAY)),
outputPath.getParent().getParent().getFileName().toString());
Assert.assertEquals(
Integer.toString(calendar.get(Calendar.DAY_OF_MONTH)),
outputPath.getParent().getParent().getParent().getFileName().toString());
Assert.assertEquals(
Integer.toString(calendar.get(Calendar.MONTH) + 1),
outputPath.getParent().getParent().getParent().getParent().getFileName().toString());
Assert.assertEquals(
Integer.toString(calendar.get(Calendar.YEAR)),
outputPath
.getParent()
.getParent()
.getParent()
.getParent()
.getParent()
.getFileName()
.toString());
Assert.assertTrue(outputPath.getFileName().toString().endsWith(BLOB_EXTENSION_TYPE));
} else {
Assert.assertTrue(outputPath.getFileName().toString().startsWith(ICEBERG_MODE_BLOB_PREFIX));
Assert.assertTrue(outputPath.getFileName().toString().contains(TestContext.volumeHash));
Assert.assertTrue(
outputPath.getFileName().toString().endsWith(ICEBERG_MODE_BLOB_EXTENSION_TYPE));
}
}

@Test
Expand Down Expand Up @@ -935,9 +944,15 @@ public void testInvalidateChannels() {
innerData.add(channel2Data);

StreamingIngestStage stage = Mockito.mock(StreamingIngestStage.class);
Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix");
Mockito.when(stage.getClientPrefix()).thenReturn(TestContext.clientPrefix);
StreamingIngestExternalVolume externalVolume =
Mockito.mock(StreamingIngestExternalVolume.class);
Mockito.when(externalVolume.getClientPrefix()).thenReturn(TestContext.clientPrefix);
Map<String, StreamingIngestExternalVolume> externalVolumeMap = new HashMap<>();
externalVolumeMap.put("db1.schema1.table1", externalVolume);
FlushService<StubChunkData> flushService =
new FlushService<>(client, channelCache, stage, isIcebergMode, false);
new FlushService<>(
client, channelCache, isIcebergMode ? externalVolumeMap : stage, isIcebergMode, false);
flushService.invalidateAllChannelsInBlob(blobData, "Invalidated by test");

Assert.assertFalse(channel1.isValid());
Expand Down

0 comments on commit 8d2870b

Please sign in to comment.