Skip to content

Commit

Permalink
Introduce Iceberg streaming merge gate (#898)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang authored Nov 11, 2024
1 parent f209e3c commit de6cb02
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 19 deletions.
66 changes: 64 additions & 2 deletions .github/workflows/End2EndTest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
WHITESOURCE_API_KEY: ${{ secrets.WHITESOURCE_API_KEY }}
continue-on-error: false
run: |
./scripts/run_gh_actions.sh
./scripts/run_gh_actions.sh -Dfailsafe.excludedGroups="net.snowflake.ingest.IcebergIT"
- name: Code Coverage
uses: codecov/codecov-action@v1
Expand Down Expand Up @@ -66,7 +66,69 @@ jobs:
./scripts/decrypt_secret_windows.ps1 -SnowflakeDeployment '${{ matrix.snowflake_cloud }}'
- name: Unit & Integration Test (Windows)
continue-on-error: false
run: mvn -DghActionsIT verify --batch-mode
run: |
mvn -DghActionsIT -D"failsafe.excludedGroups"="net.snowflake.ingest.IcebergIT" verify --batch-mode
build-iceberg:
name: Build & Test Streaming Iceberg - JDK ${{ matrix.java }}, Cloud ${{ matrix.snowflake_cloud }}
runs-on: ubuntu-20.04
strategy:
fail-fast: false # https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstrategyfail-fast
matrix:
java: [ 8 ]
snowflake_cloud: [ 'AWS' ]
steps:
- name: Checkout Code
uses: actions/checkout@v2
- name: Install Java ${{ matrix.java }}
uses: actions/setup-java@v2
with:
distribution: temurin
java-version: ${{ matrix.java }}
cache: maven

- name: Decrypt profile.json for Cloud ${{ matrix.snowflake_cloud }}
env:
DECRYPTION_PASSPHRASE: ${{ secrets.PROFILE_JSON_DECRYPT_PASSPHRASE }}
run: |
./scripts/decrypt_secret.sh ${{ matrix.snowflake_cloud }}
- name: Unit & Integration Test against Cloud ${{ matrix.snowflake_cloud }}
env:
JACOCO_COVERAGE: true
WHITESOURCE_API_KEY: ${{ secrets.WHITESOURCE_API_KEY }}
continue-on-error: false
run: |
./scripts/run_gh_actions.sh -Dfailsafe.groups="net.snowflake.ingest.IcebergIT"
- name: Code Coverage
uses: codecov/codecov-action@v1
build-iceberg-windows:
name: Build & Test - Streaming Iceberg Windows, JDK ${{ matrix.java }}, Cloud ${{ matrix.snowflake_cloud }}
runs-on: windows-2022
strategy:
fail-fast: false
matrix:
java: [ 8 ]
snowflake_cloud: [ 'AWS' ]
steps:
- name: Checkout Code
uses: actions/checkout@v2
- name: Install Java ${{ matrix.java }}
uses: actions/setup-java@v2
with:
distribution: temurin
java-version: ${{ matrix.java }}
cache: maven
- name: Decrypt profile.json for Cloud ${{ matrix.snowflake_cloud }} on Windows Powershell
env:
DECRYPTION_PASSPHRASE: ${{ secrets.PROFILE_JSON_DECRYPT_PASSPHRASE }}
shell: pwsh
run: |
./scripts/decrypt_secret_windows.ps1 -SnowflakeDeployment '${{ matrix.snowflake_cloud }}'
- name: Unit & Integration Test (Windows)
continue-on-error: false
run: |
mvn -DghActionsIT -"Dfailsafe.groups"="net.snowflake.ingest.IcebergIT" verify --batch-mode
build-e2e-jar-test:
name: e2e-jar-test cloud=${{ matrix.snowflake_cloud }} test_type=${{ matrix.test_type }} java=${{ matrix.java_path_env_var }}
runs-on: ubuntu-20.04
Expand Down
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,10 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<groups>${failsafe.groups}</groups>
<excludedGroups>${failsafe.excludedGroups}</excludedGroups>
</configuration>
<executions>
<execution>
<goals>
Expand Down
5 changes: 5 additions & 0 deletions scripts/run_gh_actions.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#!/bin/bash -e

#
# Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
#

set -o pipefail

# Build and install shaded JAR first. check_content.sh runs here.
Expand All @@ -9,6 +13,7 @@ PARAMS=()
PARAMS+=("-DghActionsIT")
# testing will not need shade dep. otherwise codecov cannot work
PARAMS+=("-Dnot-shadeDep")
PARAMS+=($1)
[[ -n "$JACOCO_COVERAGE" ]] && PARAMS+=("-Djacoco.skip.instrument=false")
# verify phase is after test/integration-test phase, which means both unit test
# and integration test will be run
Expand Down
8 changes: 8 additions & 0 deletions src/test/java/net/snowflake/ingest/IcebergIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest;

/** Interface for Iceberg Integration Tests groups */
public interface IcebergIT {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@
import java.time.OffsetTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import net.snowflake.ingest.IcebergIT;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Ignore("This test can be enabled after server side Iceberg EP support is released")
@Category(IcebergIT.class)
@RunWith(Parameterized.class)
public class IcebergDateTimeIT extends AbstractDataTypeTest {
@Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal.datatypes;

import java.util.Arrays;
import net.snowflake.ingest.IcebergIT;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Ignore("This test can be enabled after server side Iceberg EP support is released")
@Category(IcebergIT.class)
@RunWith(Parameterized.class)
public class IcebergLogicalTypesIT extends AbstractDataTypeTest {
@Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,21 @@
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import net.snowflake.ingest.IcebergIT;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.text.RandomStringGenerator;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Ignore("This test can be enabled after server side Iceberg EP support is released")
@Category(IcebergIT.class)
@RunWith(Parameterized.class)
public class IcebergNumericTypesIT extends AbstractDataTypeTest {
@Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal.datatypes;

import java.math.BigDecimal;
import java.util.Arrays;
import net.snowflake.ingest.IcebergIT;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.lang3.StringUtils;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Ignore("This test can be enabled after server side Iceberg EP support is released")
@Category(IcebergIT.class)
@RunWith(Parameterized.class)
public class IcebergStringIT extends AbstractDataTypeTest {
@Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}")
Expand Down Expand Up @@ -84,15 +89,17 @@ public void testStringAndQueries() throws Exception {
testIcebergIngestAndQuery(
"string", Arrays.asList(null, null, null, null, "aaa"),
"select COUNT(*) from {tableName} where {columnName} is null", Arrays.asList(4L));
testIcebergIngestAndQuery(
"string",
Arrays.asList(StringUtils.repeat("a", 16 * 1024 * 1024), null, null, null, "aaa"),
"select MAX({columnName}) from {tableName}",
Arrays.asList(StringUtils.repeat("a", 16 * 1024 * 1024)));
testIcebergIngestAndQuery(
"string",
Arrays.asList(StringUtils.repeat("a", 33), StringUtils.repeat("*", 3), null, ""),
"select MAX(LENGTH({columnName})) from {tableName}",
Arrays.asList(33L));

// TODO: Change to 16MB after SNOW-1798403 fixed
testIcebergIngestAndQuery(
"string",
Arrays.asList(StringUtils.repeat("a", 16 * 1024 * 1024 - 1), null, null, null, "aaa"),
"select MAX({columnName}) from {tableName}",
Arrays.asList(StringUtils.repeat("a", 16 * 1024 * 1024 - 1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import net.snowflake.ingest.IcebergIT;
import net.snowflake.ingest.TestUtils;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
Expand All @@ -24,12 +25,12 @@
import net.snowflake.ingest.utils.SFException;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Ignore("This test can be enabled after server side Iceberg EP support is released")
@Category(IcebergIT.class)
@RunWith(Parameterized.class)
public class IcebergStructuredIT extends AbstractDataTypeTest {
@Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

package net.snowflake.ingest.streaming.internal.it;

import net.snowflake.ingest.IcebergIT;
import net.snowflake.ingest.utils.Constants.IcebergSerializationPolicy;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.experimental.categories.Category;

@Ignore("Enable this after the Iceberg testing on GCS / Azure is ready")
@Category(IcebergIT.class)
public class IcebergColumnNamesIT extends ColumnNamesITBase {
@Before
public void before() throws Exception {
Expand Down

0 comments on commit de6cb02

Please sign in to comment.