Skip to content

Commit

Permalink
Destination Azure Blob Storage: Added BufferedOutputStream to fix blo…
Browse files Browse the repository at this point in the history
…ck count issue and improve performance (airbytehq#9190)
  • Loading branch information
bmatticus authored Jan 12, 2022
1 parent 5f6785d commit 80666cf
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "b4c5d105-31fd-4817-96b6-cb923bfc04cb",
"name": "Azure Blob Storage",
"dockerRepository": "airbyte/destination-azure-blob-storage",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/azureblobstorage",
"icon": "azureblobstorage.svg"
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
- name: Azure Blob Storage
destinationDefinitionId: b4c5d105-31fd-4817-96b6-cb923bfc04cb
dockerRepository: airbyte/destination-azure-blob-storage
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/azureblobstorage
icon: azureblobstorage.svg
- name: Amazon SQS
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This file is generated by io.airbyte.config.specs.SeedConnectorSpecGenerator.
# Do NOT edit this file directly. See generator class for more details.
---
- dockerImage: "airbyte/destination-azure-blob-storage:0.1.0"
- dockerImage: "airbyte/destination-azure-blob-storage:0.1.1"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/azureblobstorage"
connectionSpecification:
Expand Down Expand Up @@ -38,11 +38,23 @@
examples:
- "airbyte5storage"
azure_blob_storage_account_key:
title: "Azure Blob Storage account key"
description: "The Azure blob storage account key."
airbyte_secret: true
type: "string"
examples:
- "Z8ZkZpteggFx394vm+PJHnGTvdRncaYS+JhLKdj789YNmD+iyGTnG+PV+POiuYNhBg/ACS+LKjd%4FG3FHGN12Nd=="
azure_blob_storage_output_buffer_size:
title: "Azure Blob Storage output buffer size"
type: "integer"
description: "The amount of megabytes to buffer for the output stream to\
\ Azure. This will impact memory footprint on workers, but may need adjustment\
\ for performance and appropriate block size in Azure."
minimum: 1
maximum: 2047
default: 5
examples:
- 5
format:
title: "Output Format"
type: "object"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-azure-blob-storage

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/destination-azure-blob-storage
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,21 @@ public class AzureBlobStorageDestinationConfig {
private final String accountName;
private final String accountKey;
private final String containerName;
private final int outputStreamBufferSize;
private final AzureBlobStorageFormatConfig formatConfig;

public AzureBlobStorageDestinationConfig(
final String endpointUrl,
final String accountName,
final String accountKey,
final String containerName,
final int outputStreamBufferSize,
final AzureBlobStorageFormatConfig formatConfig) {
this.endpointUrl = endpointUrl;
this.accountName = accountName;
this.accountKey = accountKey;
this.containerName = containerName;
this.outputStreamBufferSize = outputStreamBufferSize;
this.formatConfig = formatConfig;
}

Expand All @@ -50,12 +53,22 @@ public AzureBlobStorageFormatConfig getFormatConfig() {
return formatConfig;
}

public int getOutputStreamBufferSize() {
// Convert from MB to Bytes
return outputStreamBufferSize * 1024 * 1024;
}

public static AzureBlobStorageDestinationConfig getAzureBlobStorageConfig(final JsonNode config) {
final String accountNameFomConfig = config.get("azure_blob_storage_account_name").asText();
final String accountKeyFromConfig = config.get("azure_blob_storage_account_key").asText();
final JsonNode endpointFromConfig = config
.get("azure_blob_storage_endpoint_domain_name");
final JsonNode containerName = config.get("azure_blob_storage_container_name");
final int outputStreamBufferSizeFromConfig =
config.get("azure_blob_storage_output_buffer_size") != null
? config.get("azure_blob_storage_output_buffer_size").asInt(DEFAULT_STORAGE_OUTPUT_BUFFER_SIZE)
: DEFAULT_STORAGE_OUTPUT_BUFFER_SIZE;

final JsonNode blobName = config.get("azure_blob_storage_blob_name"); // streamId

final String endpointComputed = String.format(Locale.ROOT, DEFAULT_STORAGE_ENDPOINT_FORMAT,
Expand All @@ -72,6 +85,7 @@ public static AzureBlobStorageDestinationConfig getAzureBlobStorageConfig(final
accountNameFomConfig,
accountKeyFromConfig,
containerNameComputed,
outputStreamBufferSizeFromConfig,
AzureBlobStorageFormatConfigs.getAzureBlobStorageFormatConfig(config));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public final class AzureBlobStorageDestinationConstants {
public static final String DEFAULT_STORAGE_ENDPOINT_HTTP_PROTOCOL = "https";
public static final String DEFAULT_STORAGE_ENDPOINT_DOMAIN_NAME = "blob.core.windows.net";
public static final String DEFAULT_STORAGE_ENDPOINT_FORMAT = "%s://%s.%s";
public static final int DEFAULT_STORAGE_OUTPUT_BUFFER_SIZE = 5;

private AzureBlobStorageDestinationConstants() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.integrations.destination.azure_blob_storage.writer.BaseAzureBlobStorageWriter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
Expand All @@ -28,7 +29,7 @@ public class AzureBlobStorageCsvWriter extends BaseAzureBlobStorageWriter implem

private final CsvSheetGenerator csvSheetGenerator;
private final CSVPrinter csvPrinter;
private final BlobOutputStream blobOutputStream;
private final BufferedOutputStream blobOutputStream;

public AzureBlobStorageCsvWriter(final AzureBlobStorageDestinationConfig config,
final AppendBlobClient appendBlobClient,
Expand All @@ -44,17 +45,17 @@ public AzureBlobStorageCsvWriter(final AzureBlobStorageDestinationConfig config,
.create(configuredStream.getStream().getJsonSchema(),
formatConfig);

this.blobOutputStream = appendBlobClient.getBlobOutputStream();
this.blobOutputStream = new BufferedOutputStream(appendBlobClient.getBlobOutputStream(), config.getOutputStreamBufferSize());

if (isNewlyCreatedBlob) {
this.csvPrinter = new CSVPrinter(
new PrintWriter(blobOutputStream, true, StandardCharsets.UTF_8),
new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8),
CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL)
.withHeader(csvSheetGenerator.getHeaderRow().toArray(new String[0])));
} else {
// no header required for append
this.csvPrinter = new CSVPrinter(
new PrintWriter(blobOutputStream, true, StandardCharsets.UTF_8),
new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8),
CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airbyte.integrations.destination.azure_blob_storage.writer.BaseAzureBlobStorageWriter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
Expand All @@ -32,7 +33,7 @@ public class AzureBlobStorageJsonlWriter extends BaseAzureBlobStorageWriter impl
private static final ObjectMapper MAPPER = MoreMappers.initMapper();
private static final ObjectWriter WRITER = MAPPER.writer();

private final BlobOutputStream blobOutputStream;
private final BufferedOutputStream blobOutputStream;
private final PrintWriter printWriter;

public AzureBlobStorageJsonlWriter(final AzureBlobStorageDestinationConfig config,
Expand All @@ -41,8 +42,8 @@ public AzureBlobStorageJsonlWriter(final AzureBlobStorageDestinationConfig confi
final boolean isNewlyCreatedBlob) {
super(config, appendBlobClient, configuredStream);
// at this moment we already receive appendBlobClient initialized
this.blobOutputStream = appendBlobClient.getBlobOutputStream();
this.printWriter = new PrintWriter(blobOutputStream, true, StandardCharsets.UTF_8);
this.blobOutputStream = new BufferedOutputStream(appendBlobClient.getBlobOutputStream(), config.getOutputStreamBufferSize());
this.printWriter = new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,23 @@
"examples": ["airbyte5storage"]
},
"azure_blob_storage_account_key": {
"title": "Azure Blob Storage account key",
"description": "The Azure blob storage account key.",
"airbyte_secret": true,
"type": "string",
"examples": [
"Z8ZkZpteggFx394vm+PJHnGTvdRncaYS+JhLKdj789YNmD+iyGTnG+PV+POiuYNhBg/ACS+LKjd%4FG3FHGN12Nd=="
]
},
"azure_blob_storage_output_buffer_size": {
"title": "Azure Blob Storage output buffer size (Megabytes)",
"type": "integer",
"description": "The amount of megabytes to buffer for the output stream to Azure. This will impact memory footprint on workers, but may need adjustment for performance and appropriate block size in Azure.",
"minimum": 1,
"maximum": 2047,
"default": 5,
"examples": [5]
},
"format": {
"title": "Output Format",
"type": "object",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public void testCheckInvalidAccountName() {
final JsonNode invalidConfig = Jsons.jsonNode(ImmutableMap.builder()
.put("azure_blob_storage_account_name", "someInvalidName")
.put("azure_blob_storage_account_key", config.get("azure_blob_storage_account_key"))
.put("format", getJsonlFormatConfig())
.build());

final AzureBlobStorageDestination azureBlobStorageDestination = new AzureBlobStorageDestination();
final AirbyteConnectionStatus checkResult = azureBlobStorageDestination.check(invalidConfig);

Expand All @@ -58,6 +58,7 @@ public void testCheckInvalidKey() {
final JsonNode invalidConfig = Jsons.jsonNode(ImmutableMap.builder()
.put("azure_blob_storage_account_name", config.get("azure_blob_storage_account_name"))
.put("azure_blob_storage_account_key", "someInvalidKey")
.put("format", getJsonlFormatConfig())
.build());
final AzureBlobStorageDestination azureBlobStorageDestination = new AzureBlobStorageDestination();
final AirbyteConnectionStatus checkResult = azureBlobStorageDestination.check(invalidConfig);
Expand All @@ -71,6 +72,7 @@ public void testCheckInvaliDomainName() {
.put("azure_blob_storage_account_name", config.get("azure_blob_storage_account_name"))
.put("azure_blob_storage_account_key", config.get("azure_blob_storage_account_key"))
.put("azure_blob_storage_endpoint_domain_name", "invalidDomain.com.invalid123")
.put("format", getJsonlFormatConfig())
.build());
final AzureBlobStorageDestination azureBlobStorageDestination = new AzureBlobStorageDestination();
final AirbyteConnectionStatus checkResult = azureBlobStorageDestination.check(invalidConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,22 @@ public void testSpec() throws Exception {
assertNotNull(connectionSpecification);
}

@Test
public void testConfigObjectCustomOutputBufferSize() {
final JsonNode config = Jsons.jsonNode(ImmutableMap.builder()
.put("azure_blob_storage_account_name", "accName")
.put("azure_blob_storage_account_key", "accKey")
.put("azure_blob_storage_endpoint_domain_name", "accDomainName.com")
.put("azure_blob_storage_output_buffer_size", 10)
.put("format", getFormatConfig())
.build());
final AzureBlobStorageDestinationConfig azureBlobStorageConfig = AzureBlobStorageDestinationConfig
.getAzureBlobStorageConfig(config);

assertEquals(10 * 1024 * 1024,
azureBlobStorageConfig.getOutputStreamBufferSize());
}

private JsonNode getFormatConfig() {
return Jsons.deserialize("{\n"
+ " \"format_type\": \"JSONL\"\n"
Expand Down
3 changes: 3 additions & 0 deletions docs/integrations/destinations/azureblobstorage.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The Airbyte Azure Blob Storage destination allows you to sync data to Azure Blob
| Azure blob storage container \(Bucket\) Name | string | A name of the Azure blob storage container. If not exists - will be created automatically. If leave empty, then will be created automatically airbytecontainer+timestamp. |
| Azure Blob Storage account name | string | The account's name of the Azure Blob Storage. |
| The Azure blob storage account key | string | Azure blob storage account key. Example: `abcdefghijklmnopqrstuvwxyz/0123456789+ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789%++sampleKey==`. |
| Azure Blob Storage output buffer size | integer | Azure Blob Storage output buffer size, in megabytes. Example: 5 |
| Format | object | Format specific configuration. See below for details. |

⚠️ Please note that under "Full Refresh Sync" mode, data in the configured blob will be wiped out before each sync. We recommend you to provision a dedicated Azure Blob Storage Container resource for this sync to prevent unexpected data deletion from misconfiguration. ⚠️
Expand Down Expand Up @@ -136,5 +137,7 @@ They will be like this in the output file:

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.1 | 2021-12-29 | [\#5332](https://github.com/airbytehq/airbyte/pull/9190) | Added BufferedOutputStream wrapper to blob output stream to improve performance and fix issues with 50,000 block limit. Also disabled autoflush on PrintWriter. |
| 0.1.0 | 2021-08-30 | [\#5332](https://github.com/airbytehq/airbyte/pull/5332) | Initial release with JSONL and CSV output. |


0 comments on commit 80666cf

Please sign in to comment.