diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/b4c5d105-31fd-4817-96b6-cb923bfc04cb.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/b4c5d105-31fd-4817-96b6-cb923bfc04cb.json index 4582e3038295..3c436bbd29d6 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/b4c5d105-31fd-4817-96b6-cb923bfc04cb.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/b4c5d105-31fd-4817-96b6-cb923bfc04cb.json @@ -2,7 +2,7 @@ "destinationDefinitionId": "b4c5d105-31fd-4817-96b6-cb923bfc04cb", "name": "Azure Blob Storage", "dockerRepository": "airbyte/destination-azure-blob-storage", - "dockerImageTag": "0.1.0", + "dockerImageTag": "0.1.1", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/azureblobstorage", "icon": "azureblobstorage.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index bfcd84549460..38afff07e973 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -1,7 +1,7 @@ - name: Azure Blob Storage destinationDefinitionId: b4c5d105-31fd-4817-96b6-cb923bfc04cb dockerRepository: airbyte/destination-azure-blob-storage - dockerImageTag: 0.1.0 + dockerImageTag: 0.1.1 documentationUrl: https://docs.airbyte.io/integrations/destinations/azureblobstorage icon: azureblobstorage.svg - name: Amazon SQS diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index eaf255880023..4ce2d219022d 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -1,7 +1,7 @@ # This file is generated by io.airbyte.config.specs.SeedConnectorSpecGenerator. # Do NOT edit this file directly. See generator class for more details. --- -- dockerImage: "airbyte/destination-azure-blob-storage:0.1.0" +- dockerImage: "airbyte/destination-azure-blob-storage:0.1.1" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/azureblobstorage" connectionSpecification: @@ -38,11 +38,23 @@ examples: - "airbyte5storage" azure_blob_storage_account_key: + title: "Azure Blob Storage account key" description: "The Azure blob storage account key." airbyte_secret: true type: "string" examples: - "Z8ZkZpteggFx394vm+PJHnGTvdRncaYS+JhLKdj789YNmD+iyGTnG+PV+POiuYNhBg/ACS+LKjd%4FG3FHGN12Nd==" + azure_blob_storage_output_buffer_size: + title: "Azure Blob Storage output buffer size" + type: "integer" + description: "The amount of megabytes to buffer for the output stream to\ + \ Azure. This will impact memory footprint on workers, but may need adjustment\ + \ for performance and appropriate block size in Azure." + minimum: 1 + maximum: 2047 + default: 5 + examples: + - 5 format: title: "Output Format" type: "object" diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/Dockerfile b/airbyte-integrations/connectors/destination-azure-blob-storage/Dockerfile index a33b5ab5272d..8e644aa025fe 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/Dockerfile +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-azure-blob-storage COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.version=0.1.1 LABEL io.airbyte.name=airbyte/destination-azure-blob-storage diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConfig.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConfig.java index fb2777db8a5e..8d575214b678 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConfig.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConfig.java @@ -15,6 +15,7 @@ public class AzureBlobStorageDestinationConfig { private final String accountName; private final String accountKey; private final String containerName; + private final int outputStreamBufferSize; private final AzureBlobStorageFormatConfig formatConfig; public AzureBlobStorageDestinationConfig( @@ -22,11 +23,13 @@ public AzureBlobStorageDestinationConfig( final String accountName, final String accountKey, final String containerName, + final int outputStreamBufferSize, final AzureBlobStorageFormatConfig formatConfig) { this.endpointUrl = endpointUrl; this.accountName = accountName; this.accountKey = accountKey; this.containerName = containerName; + this.outputStreamBufferSize = outputStreamBufferSize; this.formatConfig = formatConfig; } @@ -50,12 +53,22 @@ public AzureBlobStorageFormatConfig getFormatConfig() { return formatConfig; } + public int getOutputStreamBufferSize() { + // Convert from MB to Bytes + return outputStreamBufferSize * 1024 * 1024; + } + public static AzureBlobStorageDestinationConfig getAzureBlobStorageConfig(final JsonNode config) { final String accountNameFomConfig = config.get("azure_blob_storage_account_name").asText(); final String accountKeyFromConfig = config.get("azure_blob_storage_account_key").asText(); final JsonNode endpointFromConfig = config .get("azure_blob_storage_endpoint_domain_name"); final JsonNode containerName = config.get("azure_blob_storage_container_name"); + final int outputStreamBufferSizeFromConfig = + config.get("azure_blob_storage_output_buffer_size") != null + ? config.get("azure_blob_storage_output_buffer_size").asInt(DEFAULT_STORAGE_OUTPUT_BUFFER_SIZE) + : DEFAULT_STORAGE_OUTPUT_BUFFER_SIZE; + final JsonNode blobName = config.get("azure_blob_storage_blob_name"); // streamId final String endpointComputed = String.format(Locale.ROOT, DEFAULT_STORAGE_ENDPOINT_FORMAT, @@ -72,6 +85,7 @@ public static AzureBlobStorageDestinationConfig getAzureBlobStorageConfig(final accountNameFomConfig, accountKeyFromConfig, containerNameComputed, + outputStreamBufferSizeFromConfig, AzureBlobStorageFormatConfigs.getAzureBlobStorageFormatConfig(config)); } diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConstants.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConstants.java index 1cd15481997c..ebf5e90326f7 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConstants.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConstants.java @@ -10,6 +10,7 @@ public final class AzureBlobStorageDestinationConstants { public static final String DEFAULT_STORAGE_ENDPOINT_HTTP_PROTOCOL = "https"; public static final String DEFAULT_STORAGE_ENDPOINT_DOMAIN_NAME = "blob.core.windows.net"; public static final String DEFAULT_STORAGE_ENDPOINT_FORMAT = "%s://%s.%s"; + public static final int DEFAULT_STORAGE_OUTPUT_BUFFER_SIZE = 5; private AzureBlobStorageDestinationConstants() {} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java index 420202eac6d1..23e31bbf4d9c 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java @@ -11,6 +11,7 @@ import io.airbyte.integrations.destination.azure_blob_storage.writer.BaseAzureBlobStorageWriter; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.PrintWriter; import java.nio.charset.StandardCharsets; @@ -28,7 +29,7 @@ public class AzureBlobStorageCsvWriter extends BaseAzureBlobStorageWriter implem private final CsvSheetGenerator csvSheetGenerator; private final CSVPrinter csvPrinter; - private final BlobOutputStream blobOutputStream; + private final BufferedOutputStream blobOutputStream; public AzureBlobStorageCsvWriter(final AzureBlobStorageDestinationConfig config, final AppendBlobClient appendBlobClient, @@ -44,17 +45,17 @@ public AzureBlobStorageCsvWriter(final AzureBlobStorageDestinationConfig config, .create(configuredStream.getStream().getJsonSchema(), formatConfig); - this.blobOutputStream = appendBlobClient.getBlobOutputStream(); + this.blobOutputStream = new BufferedOutputStream(appendBlobClient.getBlobOutputStream(), config.getOutputStreamBufferSize()); if (isNewlyCreatedBlob) { this.csvPrinter = new CSVPrinter( - new PrintWriter(blobOutputStream, true, StandardCharsets.UTF_8), + new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8), CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL) .withHeader(csvSheetGenerator.getHeaderRow().toArray(new String[0]))); } else { // no header required for append this.csvPrinter = new CSVPrinter( - new PrintWriter(blobOutputStream, true, StandardCharsets.UTF_8), + new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8), CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL)); } } diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java index aed0849ede5e..6a0406be7a7e 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java @@ -17,6 +17,7 @@ import io.airbyte.integrations.destination.azure_blob_storage.writer.BaseAzureBlobStorageWriter; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.PrintWriter; import java.nio.charset.StandardCharsets; @@ -32,7 +33,7 @@ public class AzureBlobStorageJsonlWriter extends BaseAzureBlobStorageWriter impl private static final ObjectMapper MAPPER = MoreMappers.initMapper(); private static final ObjectWriter WRITER = MAPPER.writer(); - private final BlobOutputStream blobOutputStream; + private final BufferedOutputStream blobOutputStream; private final PrintWriter printWriter; public AzureBlobStorageJsonlWriter(final AzureBlobStorageDestinationConfig config, @@ -41,8 +42,8 @@ public AzureBlobStorageJsonlWriter(final AzureBlobStorageDestinationConfig confi final boolean isNewlyCreatedBlob) { super(config, appendBlobClient, configuredStream); // at this moment we already receive appendBlobClient initialized - this.blobOutputStream = appendBlobClient.getBlobOutputStream(); - this.printWriter = new PrintWriter(blobOutputStream, true, StandardCharsets.UTF_8); + this.blobOutputStream = new BufferedOutputStream(appendBlobClient.getBlobOutputStream(), config.getOutputStreamBufferSize()); + this.printWriter = new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8); } @Override diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/resources/spec.json index b3b7c6ea7897..d07ea2ae7f01 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/resources/spec.json @@ -35,6 +35,7 @@ "examples": ["airbyte5storage"] }, "azure_blob_storage_account_key": { + "title": "Azure Blob Storage account key", "description": "The Azure blob storage account key.", "airbyte_secret": true, "type": "string", @@ -42,6 +43,15 @@ "Z8ZkZpteggFx394vm+PJHnGTvdRncaYS+JhLKdj789YNmD+iyGTnG+PV+POiuYNhBg/ACS+LKjd%4FG3FHGN12Nd==" ] }, + "azure_blob_storage_output_buffer_size": { + "title": "Azure Blob Storage output buffer size (Megabytes)", + "type": "integer", + "description": "The amount of megabytes to buffer for the output stream to Azure. This will impact memory footprint on workers, but may need adjustment for performance and appropriate block size in Azure.", + "minimum": 1, + "maximum": 2047, + "default": 5, + "examples": [5] + }, "format": { "title": "Output Format", "type": "object", diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobDestinationAcceptanceTest.java index c067153276e9..f4679ccf8400 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobDestinationAcceptanceTest.java @@ -45,8 +45,8 @@ public void testCheckInvalidAccountName() { final JsonNode invalidConfig = Jsons.jsonNode(ImmutableMap.builder() .put("azure_blob_storage_account_name", "someInvalidName") .put("azure_blob_storage_account_key", config.get("azure_blob_storage_account_key")) + .put("format", getJsonlFormatConfig()) .build()); - final AzureBlobStorageDestination azureBlobStorageDestination = new AzureBlobStorageDestination(); final AirbyteConnectionStatus checkResult = azureBlobStorageDestination.check(invalidConfig); @@ -58,6 +58,7 @@ public void testCheckInvalidKey() { final JsonNode invalidConfig = Jsons.jsonNode(ImmutableMap.builder() .put("azure_blob_storage_account_name", config.get("azure_blob_storage_account_name")) .put("azure_blob_storage_account_key", "someInvalidKey") + .put("format", getJsonlFormatConfig()) .build()); final AzureBlobStorageDestination azureBlobStorageDestination = new AzureBlobStorageDestination(); final AirbyteConnectionStatus checkResult = azureBlobStorageDestination.check(invalidConfig); @@ -71,6 +72,7 @@ public void testCheckInvaliDomainName() { .put("azure_blob_storage_account_name", config.get("azure_blob_storage_account_name")) .put("azure_blob_storage_account_key", config.get("azure_blob_storage_account_key")) .put("azure_blob_storage_endpoint_domain_name", "invalidDomain.com.invalid123") + .put("format", getJsonlFormatConfig()) .build()); final AzureBlobStorageDestination azureBlobStorageDestination = new AzureBlobStorageDestination(); final AirbyteConnectionStatus checkResult = azureBlobStorageDestination.check(invalidConfig); diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobDestinationTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobDestinationTest.java index 941bef0365c4..586debf3b06e 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobDestinationTest.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobDestinationTest.java @@ -80,6 +80,22 @@ public void testSpec() throws Exception { assertNotNull(connectionSpecification); } + @Test + public void testConfigObjectCustomOutputBufferSize() { + final JsonNode config = Jsons.jsonNode(ImmutableMap.builder() + .put("azure_blob_storage_account_name", "accName") + .put("azure_blob_storage_account_key", "accKey") + .put("azure_blob_storage_endpoint_domain_name", "accDomainName.com") + .put("azure_blob_storage_output_buffer_size", 10) + .put("format", getFormatConfig()) + .build()); + final AzureBlobStorageDestinationConfig azureBlobStorageConfig = AzureBlobStorageDestinationConfig + .getAzureBlobStorageConfig(config); + + assertEquals(10 * 1024 * 1024, + azureBlobStorageConfig.getOutputStreamBufferSize()); + } + private JsonNode getFormatConfig() { return Jsons.deserialize("{\n" + " \"format_type\": \"JSONL\"\n" diff --git a/docs/integrations/destinations/azureblobstorage.md b/docs/integrations/destinations/azureblobstorage.md index 8c8c29b26068..94809438f453 100644 --- a/docs/integrations/destinations/azureblobstorage.md +++ b/docs/integrations/destinations/azureblobstorage.md @@ -22,6 +22,7 @@ The Airbyte Azure Blob Storage destination allows you to sync data to Azure Blob | Azure blob storage container \(Bucket\) Name | string | A name of the Azure blob storage container. If not exists - will be created automatically. If leave empty, then will be created automatically airbytecontainer+timestamp. | | Azure Blob Storage account name | string | The account's name of the Azure Blob Storage. | | The Azure blob storage account key | string | Azure blob storage account key. Example: `abcdefghijklmnopqrstuvwxyz/0123456789+ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789%++sampleKey==`. | +| Azure Blob Storage output buffer size | integer | Azure Blob Storage output buffer size, in megabytes. Example: 5 | | Format | object | Format specific configuration. See below for details. | ⚠️ Please note that under "Full Refresh Sync" mode, data in the configured blob will be wiped out before each sync. We recommend you to provision a dedicated Azure Blob Storage Container resource for this sync to prevent unexpected data deletion from misconfiguration. ⚠️ @@ -136,5 +137,7 @@ They will be like this in the output file: | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.1 | 2021-12-29 | [\#5332](https://github.com/airbytehq/airbyte/pull/9190) | Added BufferedOutputStream wrapper to blob output stream to improve performance and fix issues with 50,000 block limit. Also disabled autoflush on PrintWriter. | | 0.1.0 | 2021-08-30 | [\#5332](https://github.com/airbytehq/airbyte/pull/5332) | Initial release with JSONL and CSV output. | +