From 86781b1c357c8fc1fc8c51870b828d9f8618afc1 Mon Sep 17 00:00:00 2001 From: maoxd Date: Sat, 10 Aug 2024 23:23:31 +0800 Subject: [PATCH] [FLINK-36428][Connectors/DynamoDB] DynamoDb Table API Sink fails when null value in the RowData --- docs/content.zh/docs/connectors/table/dynamodb.md | 7 +++++++ docs/content/docs/connectors/table/dynamodb.md | 7 +++++++ .../dynamodb/table/DynamoDbConfiguration.java | 5 +++++ .../dynamodb/table/DynamoDbConnectorOptions.java | 7 +++++++ .../dynamodb/table/DynamoDbDynamicSink.java | 14 +++++++++++++- .../dynamodb/table/DynamoDbDynamicSinkFactory.java | 1 + .../dynamodb/table/RowDataElementConverter.java | 10 +++++++++- .../table/RowDataToAttributeValueConverter.java | 11 +++++++++-- .../table/DynamoDbDynamicSinkFactoryTest.java | 7 ++++++- .../RowDataToAttributeValueConverterTest.java | 13 +++++++++++++ 10 files changed, 77 insertions(+), 5 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/dynamodb.md b/docs/content.zh/docs/connectors/table/dynamodb.md index ada2a463..1522a39a 100644 --- a/docs/content.zh/docs/connectors/table/dynamodb.md +++ b/docs/content.zh/docs/connectors/table/dynamodb.md @@ -236,6 +236,13 @@ Connector Options Boolean Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job. + +
sink.ignore-nulls
+ optional + false + Boolean + Determines whether null values should be ignored in the sink. If set to true, null values are excluded from processing. + diff --git a/docs/content/docs/connectors/table/dynamodb.md b/docs/content/docs/connectors/table/dynamodb.md index e4952b22..66fd1078 100644 --- a/docs/content/docs/connectors/table/dynamodb.md +++ b/docs/content/docs/connectors/table/dynamodb.md @@ -236,6 +236,13 @@ Connector Options Boolean Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job. + +
sink.ignore-nulls
+ optional + false + Boolean + Determines whether null values should be ignored in the sink. If set to true, null values are excluded from processing. + diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java index b3fe27df..d3777f0b 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java @@ -27,6 +27,7 @@ import java.util.Properties; import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.FAIL_ON_ERROR; +import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.IGNORE_NULLS; import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.TABLE_NAME; /** DynamoDb specific configuration. */ @@ -53,6 +54,10 @@ public boolean getFailOnError() { return tableOptions.get(FAIL_ON_ERROR); } + public boolean getIgnoreNulls() { + return tableOptions.get(IGNORE_NULLS); + } + public Properties getAsyncSinkProperties() { return asyncSinkConfigurationValidator.getValidatedConfigurations(); } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConnectorOptions.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConnectorOptions.java index 899b6905..376138c5 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConnectorOptions.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConnectorOptions.java @@ -46,6 +46,13 @@ public class DynamoDbConnectorOptions { .withDescription( "Determines whether an exception should fail the job, otherwise failed requests are retried."); + public static final ConfigOption IGNORE_NULLS = + ConfigOptions.key("sink.ignore-nulls") + .booleanType() + .defaultValue(false) + .withDescription( + "Determines whether null values should be ignored in the sink. If set to true, null values are excluded from processing."); + private DynamoDbConnectorOptions() { // private constructor to prevent initialization of static class } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java index d79cd786..8091306f 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java @@ -48,6 +48,7 @@ public class DynamoDbDynamicSink extends AsyncDynamicTableSink overwriteByPartitionKeys; @@ -60,6 +61,7 @@ protected DynamoDbDynamicSink( @Nullable Long maxTimeInBufferMS, String tableName, boolean failOnError, + boolean ignoreNulls, Properties dynamoDbClientProperties, DataType physicalDataType, Set overwriteByPartitionKeys) { @@ -71,6 +73,7 @@ protected DynamoDbDynamicSink( maxTimeInBufferMS); this.tableName = tableName; this.failOnError = failOnError; + this.ignoreNulls = ignoreNulls; this.dynamoDbClientProperties = dynamoDbClientProperties; this.physicalDataType = physicalDataType; this.overwriteByPartitionKeys = overwriteByPartitionKeys; @@ -89,7 +92,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { .setFailOnError(failOnError) .setOverwriteByPartitionKeys(new ArrayList<>(overwriteByPartitionKeys)) .setDynamoDbProperties(dynamoDbClientProperties) - .setElementConverter(new RowDataElementConverter(physicalDataType)); + .setElementConverter( + new RowDataElementConverter(physicalDataType, ignoreNulls)); addAsyncOptionsToSinkBuilder(builder); @@ -106,6 +110,7 @@ public DynamicTableSink copy() { maxTimeInBufferMS, tableName, failOnError, + ignoreNulls, dynamoDbClientProperties, physicalDataType, overwriteByPartitionKeys); @@ -133,6 +138,7 @@ public static class DynamoDbDynamicTableSinkBuilder DynamoDbWriteRequest, DynamoDbDynamicTableSinkBuilder> { private String tableName; private boolean failOnError; + private boolean ignoreNulls; private Properties dynamoDbClientProperties; private DataType physicalDataType; private Set overwriteByPartitionKeys; @@ -147,6 +153,11 @@ public DynamoDbDynamicTableSinkBuilder setFailOnError(boolean failOnError) { return this; } + public DynamoDbDynamicTableSinkBuilder setIgnoreNulls(boolean ignoreNulls) { + this.ignoreNulls = ignoreNulls; + return this; + } + public DynamoDbDynamicTableSinkBuilder setDynamoDbClientProperties( Properties dynamoDbClientProperties) { this.dynamoDbClientProperties = dynamoDbClientProperties; @@ -174,6 +185,7 @@ public AsyncDynamicTableSink build() { getMaxTimeInBufferMS(), tableName, failOnError, + ignoreNulls, dynamoDbClientProperties, physicalDataType, overwriteByPartitionKeys); diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java index 008828cc..8a1033da 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java @@ -52,6 +52,7 @@ public DynamicTableSink createDynamicTableSink(Context context) { DynamoDbDynamicSink.builder() .setTableName(dynamoDbConfiguration.getTableName()) .setFailOnError(dynamoDbConfiguration.getFailOnError()) + .setIgnoreNulls(dynamoDbConfiguration.getIgnoreNulls()) .setPhysicalDataType( catalogTable.getResolvedSchema().toPhysicalRowDataType()) .setOverwriteByPartitionKeys(new HashSet<>(catalogTable.getPartitionKeys())) diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java index d0c4c90f..d156dbcc 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java @@ -35,6 +35,7 @@ @Internal public class RowDataElementConverter implements ElementConverter { + private boolean ignoreNulls = false; private final DataType physicalDataType; private transient RowDataToAttributeValueConverter rowDataToAttributeValueConverter; @@ -44,11 +45,18 @@ public RowDataElementConverter(DataType physicalDataType) { new RowDataToAttributeValueConverter(physicalDataType); } + public RowDataElementConverter(DataType physicalDataType, boolean ignoreNulls) { + this.ignoreNulls = ignoreNulls; + this.physicalDataType = physicalDataType; + this.rowDataToAttributeValueConverter = + new RowDataToAttributeValueConverter(physicalDataType, ignoreNulls); + } + @Override public DynamoDbWriteRequest apply(RowData element, SinkWriter.Context context) { if (rowDataToAttributeValueConverter == null) { rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(physicalDataType); + new RowDataToAttributeValueConverter(physicalDataType, ignoreNulls); } DynamoDbWriteRequest.Builder builder = diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java index 1b7a05e4..dbfa66bf 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java @@ -43,14 +43,21 @@ public class RowDataToAttributeValueConverter { private final DataType physicalDataType; private final TableSchema tableSchema; + private boolean ignoreNulls = false; public RowDataToAttributeValueConverter(DataType physicalDataType) { this.physicalDataType = physicalDataType; this.tableSchema = createTableSchema(); } + public RowDataToAttributeValueConverter(DataType physicalDataType, boolean ignoreNulls) { + this.physicalDataType = physicalDataType; + this.tableSchema = createTableSchema(); + this.ignoreNulls = ignoreNulls; + } + public Map convertRowData(RowData row) { - return tableSchema.itemToMap(row, false); + return tableSchema.itemToMap(row, ignoreNulls); } private StaticTableSchema createTableSchema() { @@ -85,7 +92,7 @@ private StaticTableSchema.Builder addAttribute( rowData -> DataStructureConverters.getConverter( field.getDataType()) - .toExternal( + .toExternalOrNull( fieldGetter.getFieldOrNull( rowData))) .setter(((rowData, t) -> {}))); diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java index 0bbe1329..862ab421 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java @@ -47,6 +47,7 @@ import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MAX_IN_FLIGHT_REQUESTS; import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.AWS_REGION; import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.FAIL_ON_ERROR; +import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.IGNORE_NULLS; import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.TABLE_NAME; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; import static org.assertj.core.api.Assertions.assertThat; @@ -137,7 +138,10 @@ void testCopyTableSink() { void testGoodTableSinkWithOptionalOptions() { ResolvedSchema sinkSchema = defaultSinkSchema(); Map sinkOptions = - defaultSinkOptions().withTableOption(FAIL_ON_ERROR, "true").build(); + defaultSinkOptions() + .withTableOption(FAIL_ON_ERROR, "true") + .withTableOption(IGNORE_NULLS, "true") + .build(); List partitionKeys = Collections.singletonList("partition_key"); // Construct actual sink @@ -153,6 +157,7 @@ void testGoodTableSinkWithOptionalOptions() { .setDynamoDbClientProperties(defaultSinkProperties()) .setPhysicalDataType(sinkSchema.toPhysicalRowDataType()) .setFailOnError(true) + .setIgnoreNulls(true) .build(); assertThat(actualSink).usingRecursiveComparison().isEqualTo(expectedSink); diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java index 5566ae10..612aa9a7 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java @@ -61,6 +61,19 @@ void testChar() { assertThat(actualResult).containsAllEntriesOf(expectedResult); } + @Test + void testCharNull() { + String key = "key"; + + DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.CHAR(9))); + RowDataToAttributeValueConverter rowDataToAttributeValueConverter = + new RowDataToAttributeValueConverter(dataType, true); + Map actualResult = + rowDataToAttributeValueConverter.convertRowData(createElement(null)); + + assertThat(actualResult.isEmpty()).isEqualTo(true); + } + @Test void testVarChar() { String key = "key";