From 8d2870b8bdd308237643b0d6ac340eda53518e4f Mon Sep 17 00:00:00 2001
From: Alec Huang <alec.huang@snowflake.com>
Date: Wed, 12 Jun 2024 21:05:55 +0000
Subject: [PATCH] Add FlushServiceTest in IcebergMode

---
 .../streaming/internal/FlushService.java      |  26 ++--
 .../streaming/internal/FlushServiceTest.java  | 113 ++++++++++--------
 2 files changed, 83 insertions(+), 56 deletions(-)

diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
index dc5635ffb..b47059324 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
@@ -137,23 +137,36 @@ List<List<ChannelData<T>>> getData() {
   private String clientPrefix;
 
   /**
-   * Constructor for TESTING that takes (usually mocked) StreamingIngestStage
+   * Constructor for TESTING that takes (usually mocked) StreamingIngestStage or external volume map
    *
    * @param client
    * @param cache
+   * @param targetStageOrExtervalVolumeMap
+   * @param isIcebergMode
    * @param isTestMode
    */
   FlushService(
       SnowflakeStreamingIngestClientInternal<T> client,
       ChannelCache<T> cache,
-      StreamingIngestStage targetStage, // For testing
+      Object targetStageOrExtervalVolumeMap, // For testing
       boolean isIcebergMode,
       boolean isTestMode) {
     this.owningClient = client;
     this.channelCache = cache;
-    this.targetStage = targetStage;
-    if (this.targetStage != null) {
-      this.clientPrefix = this.targetStage.getClientPrefix();
+    if (!isIcebergMode) {
+      this.targetStage = (StreamingIngestStage) targetStageOrExtervalVolumeMap;
+      this.externalVolumeMap = null;
+      if (this.targetStage != null) {
+        this.clientPrefix = this.targetStage.getClientPrefix();
+      }
+    } else {
+      this.targetStage = null;
+      this.externalVolumeMap =
+          (Map<String, StreamingIngestExternalVolume>) targetStageOrExtervalVolumeMap;
+      if (externalVolumeMap != null && !externalVolumeMap.isEmpty()) {
+        this.clientPrefix =
+            externalVolumeMap.entrySet().iterator().next().getValue().getClientPrefix();
+      }
     }
     this.counter = new AtomicLong(0);
     this.registerService = new RegisterService<>(client, isTestMode);
@@ -161,7 +174,6 @@ List<List<ChannelData<T>>> getData() {
     this.lastFlushTime = System.currentTimeMillis();
     this.isIcebergMode = isIcebergMode;
     this.isTestMode = isTestMode;
-    this.externalVolumeMap = isIcebergMode ? new ConcurrentHashMap<>() : null;
     this.latencyTimerContextMap = new ConcurrentHashMap<>();
     this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion();
     createWorkers();
@@ -724,7 +736,7 @@ void setNeedFlush() {
    * thread id + counter>.BDEC". Blob path of an external volume is: "snow_<volume hash + current
    * utc timestamp + client unique prefix + thread id + counter>.parquet".
    *
-   * @param volumeHash volume hash of an external volume, only works when isIcebergMode is true
+   * @param volumeHash volume hash of an external volume, only matters when isIcebergMode is true
    * @return the generated blob file path
    */
   private String getBlobPath(String volumeHash) {
diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java
index e2ec3c34d..08dda05a5 100644
--- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java
+++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java
@@ -1,12 +1,6 @@
 package net.snowflake.ingest.streaming.internal;
 
-import static net.snowflake.ingest.utils.Constants.BLOB_CHECKSUM_SIZE_IN_BYTES;
-import static net.snowflake.ingest.utils.Constants.BLOB_CHUNK_METADATA_LENGTH_SIZE_IN_BYTES;
-import static net.snowflake.ingest.utils.Constants.BLOB_EXTENSION_TYPE;
-import static net.snowflake.ingest.utils.Constants.BLOB_FILE_SIZE_SIZE_IN_BYTES;
-import static net.snowflake.ingest.utils.Constants.BLOB_NO_HEADER;
-import static net.snowflake.ingest.utils.Constants.BLOB_TAG_SIZE_IN_BYTES;
-import static net.snowflake.ingest.utils.Constants.BLOB_VERSION_SIZE_IN_BYTES;
+import static net.snowflake.ingest.utils.Constants.*;
 import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNK_SIZE_IN_BYTES_DEFAULT;
 
 import com.codahale.metrics.Histogram;
@@ -25,17 +19,7 @@
 import java.security.NoSuchAlgorithmException;
 import java.time.ZoneId;
 import java.time.ZoneOffset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 import javax.crypto.BadPaddingException;
 import javax.crypto.IllegalBlockSizeException;
@@ -57,10 +41,9 @@
 @RunWith(Parameterized.class)
 public class FlushServiceTest {
 
-  // TODO: Add IcebergMode = True
   @Parameterized.Parameters(name = "isIcebergMode: {0}")
   public static Object[] isIcebergMode() {
-    return new Object[] {false};
+    return new Object[] {false, true};
   }
 
   @Parameterized.Parameter public static boolean isIcebergMode;
@@ -85,9 +68,14 @@ public String toString() {
   }
 
   private abstract static class TestContext<T> implements AutoCloseable {
+    static final String clientPrefix = "client_prefix";
+    static final String volumeHash = "volume_hash";
+
     SnowflakeStreamingIngestClientInternal<T> client;
     ChannelCache<T> channelCache;
     final Map<String, SnowflakeStreamingIngestChannelInternal<T>> channels = new HashMap<>();
+    StreamingIngestExternalVolume externalVolume;
+    Map<String, StreamingIngestExternalVolume> externalVolumeMap;
     FlushService<T> flushService;
     StreamingIngestStage stage;
     ParameterProvider parameterProvider;
@@ -97,7 +85,14 @@ private abstract static class TestContext<T> implements AutoCloseable {
 
     TestContext(boolean isIcebergMode) {
       stage = Mockito.mock(StreamingIngestStage.class);
-      Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix");
+      Mockito.when(stage.getClientPrefix()).thenReturn(clientPrefix);
+      externalVolume = Mockito.mock(StreamingIngestExternalVolume.class);
+      Mockito.when(externalVolume.getClientPrefix()).thenReturn(clientPrefix);
+      externalVolumeMap = Mockito.mock(Map.class);
+      Mockito.when(externalVolumeMap.get(Mockito.any(String.class))).thenReturn(externalVolume);
+      Mockito.doReturn(Collections.singleton(new AbstractMap.SimpleEntry<>("_", externalVolume)))
+          .when(externalVolumeMap)
+          .entrySet();
       parameterProvider = new ParameterProvider(isIcebergMode);
       client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class);
       Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider);
@@ -105,7 +100,13 @@ private abstract static class TestContext<T> implements AutoCloseable {
       Mockito.when(client.getChannelCache()).thenReturn(channelCache);
       registerService = Mockito.spy(new RegisterService(client, client.isTestMode()));
       flushService =
-          Mockito.spy(new FlushService<>(client, channelCache, stage, isIcebergMode, true));
+          Mockito.spy(
+              new FlushService<>(
+                  client,
+                  channelCache,
+                  isIcebergMode ? externalVolumeMap : stage,
+                  isIcebergMode,
+                  true));
     }
 
     ChannelData<T> flushChannel(String name) {
@@ -408,32 +409,40 @@ public void testGetFilePath() {
     TestContext<?> testContext = testContextFactory.create(isIcebergMode);
     FlushService<?> flushService = testContext.flushService;
     Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
-    String outputString = flushService.getBlobPath(calendar, null);
+    String outputString = flushService.getBlobPath(calendar, TestContext.volumeHash);
     Path outputPath = Paths.get(outputString);
-    Assert.assertTrue(outputPath.getFileName().toString().contains("client_prefix"));
-    Assert.assertTrue(
-        calendar.get(Calendar.MINUTE)
-                - Integer.parseInt(outputPath.getParent().getFileName().toString())
-            <= 1);
-    Assert.assertEquals(
-        Integer.toString(calendar.get(Calendar.HOUR_OF_DAY)),
-        outputPath.getParent().getParent().getFileName().toString());
-    Assert.assertEquals(
-        Integer.toString(calendar.get(Calendar.DAY_OF_MONTH)),
-        outputPath.getParent().getParent().getParent().getFileName().toString());
-    Assert.assertEquals(
-        Integer.toString(calendar.get(Calendar.MONTH) + 1),
-        outputPath.getParent().getParent().getParent().getParent().getFileName().toString());
-    Assert.assertEquals(
-        Integer.toString(calendar.get(Calendar.YEAR)),
-        outputPath
-            .getParent()
-            .getParent()
-            .getParent()
-            .getParent()
-            .getParent()
-            .getFileName()
-            .toString());
+    Assert.assertTrue(outputPath.getFileName().toString().contains(TestContext.clientPrefix));
+    if (!isIcebergMode) {
+      Assert.assertTrue(
+          calendar.get(Calendar.MINUTE)
+                  - Integer.parseInt(outputPath.getParent().getFileName().toString())
+              <= 1);
+      Assert.assertEquals(
+          Integer.toString(calendar.get(Calendar.HOUR_OF_DAY)),
+          outputPath.getParent().getParent().getFileName().toString());
+      Assert.assertEquals(
+          Integer.toString(calendar.get(Calendar.DAY_OF_MONTH)),
+          outputPath.getParent().getParent().getParent().getFileName().toString());
+      Assert.assertEquals(
+          Integer.toString(calendar.get(Calendar.MONTH) + 1),
+          outputPath.getParent().getParent().getParent().getParent().getFileName().toString());
+      Assert.assertEquals(
+          Integer.toString(calendar.get(Calendar.YEAR)),
+          outputPath
+              .getParent()
+              .getParent()
+              .getParent()
+              .getParent()
+              .getParent()
+              .getFileName()
+              .toString());
+      Assert.assertTrue(outputPath.getFileName().toString().endsWith(BLOB_EXTENSION_TYPE));
+    } else {
+      Assert.assertTrue(outputPath.getFileName().toString().startsWith(ICEBERG_MODE_BLOB_PREFIX));
+      Assert.assertTrue(outputPath.getFileName().toString().contains(TestContext.volumeHash));
+      Assert.assertTrue(
+          outputPath.getFileName().toString().endsWith(ICEBERG_MODE_BLOB_EXTENSION_TYPE));
+    }
   }
 
   @Test
@@ -935,9 +944,15 @@ public void testInvalidateChannels() {
     innerData.add(channel2Data);
 
     StreamingIngestStage stage = Mockito.mock(StreamingIngestStage.class);
-    Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix");
+    Mockito.when(stage.getClientPrefix()).thenReturn(TestContext.clientPrefix);
+    StreamingIngestExternalVolume externalVolume =
+        Mockito.mock(StreamingIngestExternalVolume.class);
+    Mockito.when(externalVolume.getClientPrefix()).thenReturn(TestContext.clientPrefix);
+    Map<String, StreamingIngestExternalVolume> externalVolumeMap = new HashMap<>();
+    externalVolumeMap.put("db1.schema1.table1", externalVolume);
     FlushService<StubChunkData> flushService =
-        new FlushService<>(client, channelCache, stage, isIcebergMode, false);
+        new FlushService<>(
+            client, channelCache, isIcebergMode ? externalVolumeMap : stage, isIcebergMode, false);
     flushService.invalidateAllChannelsInBlob(blobData, "Invalidated by test");
 
     Assert.assertFalse(channel1.isValid());