From 4584f1c8ce48ad8142326febe324f4746aef4761 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 25 Aug 2023 18:49:15 -0700 Subject: [PATCH] Destination snowflake: prevent data loss, by only running T+D once at end of sync (#29881) --- .../destination/staging/GeneralStagingFunctions.java | 10 ---------- .../connectors/destination-snowflake/Dockerfile | 2 +- .../connectors/destination-snowflake/metadata.yaml | 2 +- docs/integrations/destinations/snowflake.md | 1 + 4 files changed, 3 insertions(+), 12 deletions(-) diff --git a/airbyte-integrations/bases/bases-destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/GeneralStagingFunctions.java b/airbyte-integrations/bases/bases-destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/GeneralStagingFunctions.java index d3c24b0a1166..301ee7d649f2 100644 --- a/airbyte-integrations/bases/bases-destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/GeneralStagingFunctions.java +++ b/airbyte-integrations/bases/bases-destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/GeneralStagingFunctions.java @@ -10,7 +10,6 @@ import io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction; import io.airbyte.integrations.destination.buffered_stream_consumer.OnStartFunction; import io.airbyte.integrations.destination.jdbc.WriteConfig; -import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; import java.util.ArrayList; import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -80,15 +79,6 @@ public static void copyIntoTableFromStage(final JdbcDatabase database, try { stagingOperations.copyIntoTableFromStage(database, stageName, stagingPath, stagedFiles, tableName, schemaName); - - AirbyteStreamNameNamespacePair streamId = new AirbyteStreamNameNamespacePair(streamName, streamNamespace); - if (!typerDeduperValve.containsKey(streamId)) { - typerDeduperValve.addStream(streamId); - } - if (typerDeduperValve.readyToTypeAndDedupe(streamId)) { - typerDeduper.typeAndDedupe(streamId.getNamespace(), streamId.getName()); - typerDeduperValve.updateTimeAndIncreaseInterval(streamId); - } } catch (final Exception e) { stagingOperations.cleanUpStage(database, stageName, stagedFiles); log.info("Cleaning stage path {}", stagingPath); diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index 011211b74ff2..f5b15634eddd 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -49,7 +49,7 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1 ENV ENABLE_SENTRY true -LABEL io.airbyte.version=2.1.2 +LABEL io.airbyte.version=2.1.3 LABEL io.airbyte.name=airbyte/destination-snowflake ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh" diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index afb7e6e77439..54b819007784 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 2.1.2 + dockerImageTag: 2.1.3 dockerRepository: airbyte/destination-snowflake githubIssueLabel: destination-snowflake icon: snowflake.svg diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index b238e344ef8b..5a80360e48ae 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -271,6 +271,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n | Version | Date | Pull Request | Subject | |:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.1.3 | 2023-08-25 | [\#29881](https://github.com/airbytehq/airbyte/pull/29881) | Destinations v2: Only run T+D once at end of sync, to prevent data loss under async conditions | | 2.1.2 | 2023-08-24 | [\#29805](https://github.com/airbytehq/airbyte/pull/29805) | Destinations v2: Don't soft reset in migration | | 2.1.1 | 2023-08-23 | [\#29774](https://github.com/airbytehq/airbyte/pull/29774) | Destinations v2: Don't soft reset overwrite syncs | | 2.1.0 | 2023-08-21 | [\#29636](https://github.com/airbytehq/airbyte/pull/29636) | Destinations v2: Several Critical Bug Fixes (cursorless dedup, improved floating-point handling, improved special characters handling; improved error handling) |