From de6cb02192a66856ceb2a9ec7e4aba5da8860965 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Mon, 11 Nov 2024 15:59:06 -0800 Subject: [PATCH] Introduce Iceberg streaming merge gate (#898) --- .github/workflows/End2EndTest.yml | 66 ++++++++++++++++++- pom.xml | 4 ++ scripts/run_gh_actions.sh | 5 ++ .../java/net/snowflake/ingest/IcebergIT.java | 8 +++ .../internal/datatypes/IcebergDateTimeIT.java | 5 +- .../datatypes/IcebergLogicalTypesIT.java | 9 ++- .../datatypes/IcebergNumericTypesIT.java | 5 +- .../internal/datatypes/IcebergStringIT.java | 21 ++++-- .../datatypes/IcebergStructuredIT.java | 5 +- .../internal/it/IcebergColumnNamesIT.java | 5 +- 10 files changed, 114 insertions(+), 19 deletions(-) create mode 100644 src/test/java/net/snowflake/ingest/IcebergIT.java diff --git a/.github/workflows/End2EndTest.yml b/.github/workflows/End2EndTest.yml index 0a47d7bd1..78c0c28ca 100644 --- a/.github/workflows/End2EndTest.yml +++ b/.github/workflows/End2EndTest.yml @@ -37,7 +37,7 @@ jobs: WHITESOURCE_API_KEY: ${{ secrets.WHITESOURCE_API_KEY }} continue-on-error: false run: | - ./scripts/run_gh_actions.sh + ./scripts/run_gh_actions.sh -Dfailsafe.excludedGroups="net.snowflake.ingest.IcebergIT" - name: Code Coverage uses: codecov/codecov-action@v1 @@ -66,7 +66,69 @@ jobs: ./scripts/decrypt_secret_windows.ps1 -SnowflakeDeployment '${{ matrix.snowflake_cloud }}' - name: Unit & Integration Test (Windows) continue-on-error: false - run: mvn -DghActionsIT verify --batch-mode + run: | + mvn -DghActionsIT -D"failsafe.excludedGroups"="net.snowflake.ingest.IcebergIT" verify --batch-mode + build-iceberg: + name: Build & Test Streaming Iceberg - JDK ${{ matrix.java }}, Cloud ${{ matrix.snowflake_cloud }} + runs-on: ubuntu-20.04 + strategy: + fail-fast: false # https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstrategyfail-fast + matrix: + java: [ 8 ] + snowflake_cloud: [ 'AWS' ] + steps: + - name: Checkout Code + uses: actions/checkout@v2 + - name: Install Java ${{ matrix.java }} + uses: actions/setup-java@v2 + with: + distribution: temurin + java-version: ${{ matrix.java }} + cache: maven + + - name: Decrypt profile.json for Cloud ${{ matrix.snowflake_cloud }} + env: + DECRYPTION_PASSPHRASE: ${{ secrets.PROFILE_JSON_DECRYPT_PASSPHRASE }} + run: | + ./scripts/decrypt_secret.sh ${{ matrix.snowflake_cloud }} + + - name: Unit & Integration Test against Cloud ${{ matrix.snowflake_cloud }} + env: + JACOCO_COVERAGE: true + WHITESOURCE_API_KEY: ${{ secrets.WHITESOURCE_API_KEY }} + continue-on-error: false + run: | + ./scripts/run_gh_actions.sh -Dfailsafe.groups="net.snowflake.ingest.IcebergIT" + + - name: Code Coverage + uses: codecov/codecov-action@v1 + build-iceberg-windows: + name: Build & Test - Streaming Iceberg Windows, JDK ${{ matrix.java }}, Cloud ${{ matrix.snowflake_cloud }} + runs-on: windows-2022 + strategy: + fail-fast: false + matrix: + java: [ 8 ] + snowflake_cloud: [ 'AWS' ] + steps: + - name: Checkout Code + uses: actions/checkout@v2 + - name: Install Java ${{ matrix.java }} + uses: actions/setup-java@v2 + with: + distribution: temurin + java-version: ${{ matrix.java }} + cache: maven + - name: Decrypt profile.json for Cloud ${{ matrix.snowflake_cloud }} on Windows Powershell + env: + DECRYPTION_PASSPHRASE: ${{ secrets.PROFILE_JSON_DECRYPT_PASSPHRASE }} + shell: pwsh + run: | + ./scripts/decrypt_secret_windows.ps1 -SnowflakeDeployment '${{ matrix.snowflake_cloud }}' + - name: Unit & Integration Test (Windows) + continue-on-error: false + run: | + mvn -DghActionsIT -"Dfailsafe.groups"="net.snowflake.ingest.IcebergIT" verify --batch-mode build-e2e-jar-test: name: e2e-jar-test cloud=${{ matrix.snowflake_cloud }} test_type=${{ matrix.test_type }} java=${{ matrix.java_path_env_var }} runs-on: ubuntu-20.04 diff --git a/pom.xml b/pom.xml index 92caf6074..db6980504 100644 --- a/pom.xml +++ b/pom.xml @@ -1406,6 +1406,10 @@ org.apache.maven.plugins maven-failsafe-plugin + + ${failsafe.groups} + ${failsafe.excludedGroups} + diff --git a/scripts/run_gh_actions.sh b/scripts/run_gh_actions.sh index 49895010b..c6f9f99ed 100755 --- a/scripts/run_gh_actions.sh +++ b/scripts/run_gh_actions.sh @@ -1,5 +1,9 @@ #!/bin/bash -e +# +# Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. +# + set -o pipefail # Build and install shaded JAR first. check_content.sh runs here. @@ -9,6 +13,7 @@ PARAMS=() PARAMS+=("-DghActionsIT") # testing will not need shade dep. otherwise codecov cannot work PARAMS+=("-Dnot-shadeDep") +PARAMS+=($1) [[ -n "$JACOCO_COVERAGE" ]] && PARAMS+=("-Djacoco.skip.instrument=false") # verify phase is after test/integration-test phase, which means both unit test # and integration test will be run diff --git a/src/test/java/net/snowflake/ingest/IcebergIT.java b/src/test/java/net/snowflake/ingest/IcebergIT.java new file mode 100644 index 000000000..1f6ffe101 --- /dev/null +++ b/src/test/java/net/snowflake/ingest/IcebergIT.java @@ -0,0 +1,8 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest; + +/** Interface for Iceberg Integration Tests groups */ +public interface IcebergIT {} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java index eeb0cee4b..db17d68ec 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java @@ -14,17 +14,18 @@ import java.time.OffsetTime; import java.time.ZoneOffset; import java.util.Arrays; +import net.snowflake.ingest.IcebergIT; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.assertj.core.api.Assertions; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -@Ignore("This test can be enabled after server side Iceberg EP support is released") +@Category(IcebergIT.class) @RunWith(Parameterized.class) public class IcebergDateTimeIT extends AbstractDataTypeTest { @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java index e6d54418e..63421d68c 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java @@ -1,17 +1,22 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal.datatypes; import java.util.Arrays; +import net.snowflake.ingest.IcebergIT; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.assertj.core.api.Assertions; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -@Ignore("This test can be enabled after server side Iceberg EP support is released") +@Category(IcebergIT.class) @RunWith(Parameterized.class) public class IcebergLogicalTypesIT extends AbstractDataTypeTest { @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java index d5d436f6f..21e28fc87 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java @@ -9,20 +9,21 @@ import java.util.Arrays; import java.util.List; import java.util.Random; +import net.snowflake.ingest.IcebergIT; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.commons.text.RandomStringGenerator; import org.assertj.core.api.Assertions; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Ignore("This test can be enabled after server side Iceberg EP support is released") +@Category(IcebergIT.class) @RunWith(Parameterized.class) public class IcebergNumericTypesIT extends AbstractDataTypeTest { @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java index 2e0989994..669edeb21 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java @@ -1,19 +1,24 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal.datatypes; import java.math.BigDecimal; import java.util.Arrays; +import net.snowflake.ingest.IcebergIT; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.commons.lang3.StringUtils; import org.assertj.core.api.Assertions; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -@Ignore("This test can be enabled after server side Iceberg EP support is released") +@Category(IcebergIT.class) @RunWith(Parameterized.class) public class IcebergStringIT extends AbstractDataTypeTest { @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") @@ -84,15 +89,17 @@ public void testStringAndQueries() throws Exception { testIcebergIngestAndQuery( "string", Arrays.asList(null, null, null, null, "aaa"), "select COUNT(*) from {tableName} where {columnName} is null", Arrays.asList(4L)); - testIcebergIngestAndQuery( - "string", - Arrays.asList(StringUtils.repeat("a", 16 * 1024 * 1024), null, null, null, "aaa"), - "select MAX({columnName}) from {tableName}", - Arrays.asList(StringUtils.repeat("a", 16 * 1024 * 1024))); testIcebergIngestAndQuery( "string", Arrays.asList(StringUtils.repeat("a", 33), StringUtils.repeat("*", 3), null, ""), "select MAX(LENGTH({columnName})) from {tableName}", Arrays.asList(33L)); + + // TODO: Change to 16MB after SNOW-1798403 fixed + testIcebergIngestAndQuery( + "string", + Arrays.asList(StringUtils.repeat("a", 16 * 1024 * 1024 - 1), null, null, null, "aaa"), + "select MAX({columnName}) from {tableName}", + Arrays.asList(StringUtils.repeat("a", 16 * 1024 * 1024 - 1))); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java index 8d3071518..2995d1dc9 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java @@ -15,6 +15,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import net.snowflake.ingest.IcebergIT; import net.snowflake.ingest.TestUtils; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; @@ -24,12 +25,12 @@ import net.snowflake.ingest.utils.SFException; import org.assertj.core.api.Assertions; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -@Ignore("This test can be enabled after server side Iceberg EP support is released") +@Category(IcebergIT.class) @RunWith(Parameterized.class) public class IcebergStructuredIT extends AbstractDataTypeTest { @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergColumnNamesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergColumnNamesIT.java index 8051a9467..58e368fc1 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergColumnNamesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergColumnNamesIT.java @@ -4,11 +4,12 @@ package net.snowflake.ingest.streaming.internal.it; +import net.snowflake.ingest.IcebergIT; import net.snowflake.ingest.utils.Constants.IcebergSerializationPolicy; import org.junit.Before; -import org.junit.Ignore; +import org.junit.experimental.categories.Category; -@Ignore("Enable this after the Iceberg testing on GCS / Azure is ready") +@Category(IcebergIT.class) public class IcebergColumnNamesIT extends ColumnNamesITBase { @Before public void before() throws Exception {