From 3f9cbecbaf08ca66fe865d2d954ff9ae2d2feab5 Mon Sep 17 00:00:00 2001 From: Koji Matsumoto Date: Tue, 18 Jan 2022 21:41:12 +0900 Subject: [PATCH] Destination BigQuery: Accept Dataset ID field prefixed by Project ID (#8383) * add Dataset ID parse method * add BigQuery Destination unit test * update change log * fit to the latest code base * update change log * change var name to const name * change public method to private * add test cases for testGetDatasetIdFail * add integration test for dataset-id prefixed with project-id * fix getDatasetId * add comment to parameterized test provider * update docker image versions * update docker image versions again --- .../079d5540-f236-4294-ba7c-ade8fd918496.json | 2 +- .../22f6c74f-5699-40ff-833c-4a879ea40133.json | 2 +- .../seed/destination_definitions.yaml | 4 +- .../resources/seed/destination_specs.yaml | 4 +- .../Dockerfile | 2 +- .../destination-bigquery/Dockerfile | 2 +- .../bigquery/BigQueryDestination.java | 2 +- .../destination/bigquery/BigQueryUtils.java | 22 +++++- .../bigquery/BigQueryDestinationTest.java | 61 +++++++++++++--- .../bigquery/BigQueryUtilsTest.java | 69 +++++++++++++++++++ docs/integrations/destinations/bigquery.md | 2 + 11 files changed, 152 insertions(+), 20 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryUtilsTest.java diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/079d5540-f236-4294-ba7c-ade8fd918496.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/079d5540-f236-4294-ba7c-ade8fd918496.json index aee3fcbfd47a..d433add5e807 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/079d5540-f236-4294-ba7c-ade8fd918496.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/079d5540-f236-4294-ba7c-ade8fd918496.json @@ -2,7 +2,7 @@ "destinationDefinitionId": "079d5540-f236-4294-ba7c-ade8fd918496", "name": "BigQuery (denormalized typed struct)", "dockerRepository": "airbyte/destination-bigquery-denormalized", - "dockerImageTag": "0.2.3", + "dockerImageTag": "0.2.4", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery", "icon": "bigquery.svg" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json index e8bfbb235d97..d076c305f7c2 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json @@ -2,7 +2,7 @@ "destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133", "name": "BigQuery", "dockerRepository": "airbyte/destination-bigquery", - "dockerImageTag": "0.6.3", + "dockerImageTag": "0.6.4", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery", "icon": "bigquery.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 1e561ba8289d..d1584307abe4 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -13,13 +13,13 @@ - name: BigQuery destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 dockerRepository: airbyte/destination-bigquery - dockerImageTag: 0.6.3 + dockerImageTag: 0.6.4 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery icon: bigquery.svg - name: BigQuery (denormalized typed struct) destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496 dockerRepository: airbyte/destination-bigquery-denormalized - dockerImageTag: 0.2.3 + dockerImageTag: 0.2.4 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery icon: bigquery.svg - name: Cassandra diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 9276dac6c07b..235c7a9ebb71 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -188,7 +188,7 @@ supportsDBT: false supported_destination_sync_modes: - "append" -- dockerImage: "airbyte/destination-bigquery:0.6.3" +- dockerImage: "airbyte/destination-bigquery:0.6.4" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery" connectionSpecification: @@ -378,7 +378,7 @@ - "overwrite" - "append" - "append_dedup" -- dockerImage: "airbyte/destination-bigquery-denormalized:0.2.3" +- dockerImage: "airbyte/destination-bigquery-denormalized:0.2.4" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery" connectionSpecification: diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile index 45801e0f0113..11d11d4387ad 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-bigquery-denormalized COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.2.3 +LABEL io.airbyte.version=0.2.4 LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index 605329e28e0b..ef038a965403 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-bigquery COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.6.3 +LABEL io.airbyte.version=0.6.4 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 4f4c3502afd6..e243a59ef9b7 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -56,7 +56,7 @@ public BigQueryDestination() { @Override public AirbyteConnectionStatus check(final JsonNode config) { try { - final String datasetId = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText(); + final String datasetId = BigQueryUtils.getDatasetId(config); final String datasetLocation = BigQueryUtils.getDatasetLocation(config); final BigQuery bigquery = getBigQuery(config); final UploadingMethod uploadingMethod = BigQueryUtils.getLoadingMethod(config); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index fd19e8861e19..545e8aff5896 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.bigquery; import static io.airbyte.integrations.destination.bigquery.helpers.LoggerHelper.getJobErrorMessage; +import static java.util.Objects.isNull; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -162,6 +163,25 @@ public static JsonNode getGcsAvroJsonNodeConfig(final JsonNode config) { return gcsJsonNode; } + public static String getDatasetId(final JsonNode config) { + String datasetId = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText(); + + int colonIndex = datasetId.indexOf(":"); + if (colonIndex != -1) { + String projectIdPart = datasetId.substring(0, colonIndex); + String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); + if (!(projectId.equals(projectIdPart))) { + throw new IllegalArgumentException(String.format( + "Project ID included in Dataset ID must match Project ID field's value: Project ID is `%s`, but you specified `%s` in Dataset ID", + projectId, + projectIdPart)); + } + } + // if colonIndex is -1, then this returns the entire string + // otherwise it returns everything after the colon + return datasetId.substring(colonIndex + 1); + } + public static String getDatasetLocation(final JsonNode config) { if (config.has(BigQueryConsts.CONFIG_DATASET_LOCATION)) { return config.get(BigQueryConsts.CONFIG_DATASET_LOCATION).asText(); @@ -214,7 +234,7 @@ public static void transformJsonDateTimeToBigDataFormat(List dateTimeFie } public static String getSchema(final JsonNode config, final ConfiguredAirbyteStream stream) { - final String defaultSchema = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText(); + final String defaultSchema = getDatasetId(config); final String srcNamespace = stream.getStream().getNamespace(); if (srcNamespace == null) { return defaultSchema; diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java index d364c479715b..70c7e9dd1627 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java @@ -55,13 +55,18 @@ import java.time.Instant; import java.util.List; import java.util.Set; +import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.apache.commons.lang3.tuple.ImmutablePair; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -197,16 +202,20 @@ void testSpec() throws Exception { assertEquals(expected, actual); } - @Test - void testCheckSuccess() { + @ParameterizedTest + @MethodSource("datasetIdResetterProvider") + void testCheckSuccess(DatasetIdResetter resetDatasetId) { + resetDatasetId.accept(config); final AirbyteConnectionStatus actual = new BigQueryDestination().check(config); final AirbyteConnectionStatus expected = new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); assertEquals(expected, actual); } - @Test - void testCheckFailure() { + @ParameterizedTest + @MethodSource("datasetIdResetterProvider") + void testCheckFailure(DatasetIdResetter resetDatasetId) { ((ObjectNode) config).put(BigQueryConsts.CONFIG_PROJECT_ID, "fake"); + resetDatasetId.accept(config); final AirbyteConnectionStatus actual = new BigQueryDestination().check(config); final String actualMessage = actual.getMessage(); LOGGER.info("Checking expected failure message:" + actualMessage); @@ -215,8 +224,10 @@ void testCheckFailure() { assertEquals(expected, actual.withMessage("")); } - @Test - void testWriteSuccess() throws Exception { + @ParameterizedTest + @MethodSource("datasetIdResetterProvider") + void testWriteSuccess(DatasetIdResetter resetDatasetId) throws Exception { + resetDatasetId.accept(config); final BigQueryDestination destination = new BigQueryDestination(); final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); @@ -244,8 +255,10 @@ void testWriteSuccess() throws Exception { .collect(Collectors.toList())); } - @Test - void testWriteFailure() throws Exception { + @ParameterizedTest + @MethodSource("datasetIdResetterProvider") + void testWriteFailure(DatasetIdResetter resetDatasetId) throws Exception { + resetDatasetId.accept(config); // hack to force an exception to be thrown from within the consumer. final AirbyteMessage spiedMessage = spy(MESSAGE_USERS1); doThrow(new RuntimeException()).when(spiedMessage).getRecord(); @@ -305,8 +318,10 @@ private List retrieveRecords(final String tableName) throws Exception .collect(Collectors.toList()); } - @Test - void testWritePartitionOverUnpartitioned() throws Exception { + @ParameterizedTest + @MethodSource("datasetIdResetterProvider") + void testWritePartitionOverUnpartitioned(DatasetIdResetter resetDatasetId) throws Exception { + resetDatasetId.accept(config); final String raw_table_name = String.format("_airbyte_raw_%s", USERS_STREAM_NAME); createUnpartitionedTable(bigquery, dataset, raw_table_name); assertFalse(isTablePartitioned(bigquery, dataset, raw_table_name)); @@ -369,4 +384,30 @@ private boolean isTablePartitioned(final BigQuery bigquery, final Dataset datase return false; } + private static class DatasetIdResetter { + private Consumer consumer; + + DatasetIdResetter(Consumer consumer) { + this.consumer = consumer; + } + + public void accept(JsonNode config) { + consumer.accept(config); + } + } + + private static Stream datasetIdResetterProvider() { + // parameterized test with two dataset-id patterns: `dataset_id` and `project-id:dataset_id` + return Stream.of( + Arguments.arguments(new DatasetIdResetter(config -> {})), + Arguments.arguments(new DatasetIdResetter( + config -> { + String projectId = ((ObjectNode) config).get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); + String datasetId = ((ObjectNode) config).get(BigQueryConsts.CONFIG_DATASET_ID).asText(); + ((ObjectNode) config).put(BigQueryConsts.CONFIG_DATASET_ID, + String.format("%s:%s", projectId, datasetId)); + } + )) + ); + } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryUtilsTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryUtilsTest.java new file mode 100644 index 000000000000..586e0cf7ce74 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryUtilsTest.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class BigQueryUtilsTest { + + private ImmutableMap.Builder configMapBuilder; + + @BeforeEach + public void init() { + configMapBuilder = ImmutableMap.builder() + .put(BigQueryConsts.CONFIG_CREDS, "test_secret") + .put(BigQueryConsts.CONFIG_DATASET_LOCATION, "US"); + } + + @ParameterizedTest + @MethodSource("validBigQueryIdProvider") + public void testGetDatasetIdSuccess(String projectId, String datasetId, String expected) throws Exception { + JsonNode config = Jsons.jsonNode(configMapBuilder + .put(BigQueryConsts.CONFIG_PROJECT_ID, projectId) + .put(BigQueryConsts.CONFIG_DATASET_ID, datasetId) + .build()); + + String actual = BigQueryUtils.getDatasetId(config); + + assertEquals(expected, actual); + } + + @ParameterizedTest + @MethodSource("invalidBigQueryIdProvider") + public void testGetDatasetIdFail(String projectId, String datasetId, String expected) throws Exception { + JsonNode config = Jsons.jsonNode(configMapBuilder + .put(BigQueryConsts.CONFIG_PROJECT_ID, projectId) + .put(BigQueryConsts.CONFIG_DATASET_ID, datasetId) + .build()); + + Exception exception = assertThrows(IllegalArgumentException.class, () -> BigQueryUtils.getDatasetId(config)); + + assertEquals(expected, exception.getMessage()); + } + + private static Stream validBigQueryIdProvider() { + return Stream.of( + Arguments.arguments("my-project", "my_dataset", "my_dataset"), + Arguments.arguments("my-project", "my-project:my_dataset", "my_dataset")); + } + + private static Stream invalidBigQueryIdProvider() { + return Stream.of( + Arguments.arguments("my-project", ":my_dataset", + "Project ID included in Dataset ID must match Project ID field's value: Project ID is `my-project`, but you specified `` in Dataset ID"), + Arguments.arguments("my-project", "your-project:my_dataset", + "Project ID included in Dataset ID must match Project ID field's value: Project ID is `my-project`, but you specified `your-project` in Dataset ID")); + } +} diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 3a72ac40c336..fffbea6c7c4f 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -153,6 +153,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into | Version | Date | Pull Request | Subject | |:--------| :--- | :--- | :--- | +| 0.6.4 | 2022-01-17 | [\#8383](https://github.com/airbytehq/airbyte/issues/8383) | Support dataset-id prefixed by project-id | | 0.6.3 | 2022-01-12 | [\#9415](https://github.com/airbytehq/airbyte/pull/9415) | BigQuery Destination : Fix GCS processing of Facebook data | | 0.6.2 | 2022-01-10 | [\#9121](https://github.com/airbytehq/airbyte/pull/9121) | Fixed check method for GCS mode to verify if all roles assigned to user | | 0.6.1 | 2021-12-22 | [\#9039](https://github.com/airbytehq/airbyte/pull/9039) | Added part_size configuration to UI for GCS staging | @@ -172,6 +173,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------| :--- | +| 0.2.4 | 2022-01-17 | [\#8383](https://github.com/airbytehq/airbyte/issues/8383) | BigQuery/BiqQuery denorm Destinations : Support dataset-id prefixed by project-id | | 0.2.3 | 2022-01-12 | [\#9415](https://github.com/airbytehq/airbyte/pull/9415) | BigQuery Destination : Fix GCS processing of Facebook data | | 0.2.2 | 2021-12-22 | [\#9039](https://github.com/airbytehq/airbyte/pull/9039) | Added part_size configuration to UI for GCS staging | | 0.2.1 | 2021-12-21 | [\#8574](https://github.com/airbytehq/airbyte/pull/8574) | Added namespace to Avro and Parquet record types |