From 841aaf711db9c8ec10d336b59bf14554489987b1 Mon Sep 17 00:00:00 2001 From: maoxd Date: Sat, 10 Aug 2024 23:23:31 +0800 Subject: [PATCH] [FLINK-36428][Connectors/DynamoDB] DynamoDb Table API Sink fails when null value in the RowData --- .../dynamodb/table/DynamoDbConfiguration.java | 5 +++++ .../dynamodb/table/DynamoDbConnectorOptions.java | 7 +++++++ .../dynamodb/table/DynamoDbDynamicSink.java | 14 +++++++++++++- .../dynamodb/table/DynamoDbDynamicSinkFactory.java | 1 + .../dynamodb/table/RowDataElementConverter.java | 10 +++++++++- .../table/RowDataToAttributeValueConverter.java | 11 +++++++++-- 6 files changed, 44 insertions(+), 4 deletions(-) diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java index b3fe27df..d3777f0b 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java @@ -27,6 +27,7 @@ import java.util.Properties; import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.FAIL_ON_ERROR; +import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.IGNORE_NULLS; import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.TABLE_NAME; /** DynamoDb specific configuration. */ @@ -53,6 +54,10 @@ public boolean getFailOnError() { return tableOptions.get(FAIL_ON_ERROR); } + public boolean getIgnoreNulls() { + return tableOptions.get(IGNORE_NULLS); + } + public Properties getAsyncSinkProperties() { return asyncSinkConfigurationValidator.getValidatedConfigurations(); } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConnectorOptions.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConnectorOptions.java index 899b6905..376138c5 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConnectorOptions.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConnectorOptions.java @@ -46,6 +46,13 @@ public class DynamoDbConnectorOptions { .withDescription( "Determines whether an exception should fail the job, otherwise failed requests are retried."); + public static final ConfigOption IGNORE_NULLS = + ConfigOptions.key("sink.ignore-nulls") + .booleanType() + .defaultValue(false) + .withDescription( + "Determines whether null values should be ignored in the sink. If set to true, null values are excluded from processing."); + private DynamoDbConnectorOptions() { // private constructor to prevent initialization of static class } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java index d79cd786..8091306f 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java @@ -48,6 +48,7 @@ public class DynamoDbDynamicSink extends AsyncDynamicTableSink overwriteByPartitionKeys; @@ -60,6 +61,7 @@ protected DynamoDbDynamicSink( @Nullable Long maxTimeInBufferMS, String tableName, boolean failOnError, + boolean ignoreNulls, Properties dynamoDbClientProperties, DataType physicalDataType, Set overwriteByPartitionKeys) { @@ -71,6 +73,7 @@ protected DynamoDbDynamicSink( maxTimeInBufferMS); this.tableName = tableName; this.failOnError = failOnError; + this.ignoreNulls = ignoreNulls; this.dynamoDbClientProperties = dynamoDbClientProperties; this.physicalDataType = physicalDataType; this.overwriteByPartitionKeys = overwriteByPartitionKeys; @@ -89,7 +92,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { .setFailOnError(failOnError) .setOverwriteByPartitionKeys(new ArrayList<>(overwriteByPartitionKeys)) .setDynamoDbProperties(dynamoDbClientProperties) - .setElementConverter(new RowDataElementConverter(physicalDataType)); + .setElementConverter( + new RowDataElementConverter(physicalDataType, ignoreNulls)); addAsyncOptionsToSinkBuilder(builder); @@ -106,6 +110,7 @@ public DynamicTableSink copy() { maxTimeInBufferMS, tableName, failOnError, + ignoreNulls, dynamoDbClientProperties, physicalDataType, overwriteByPartitionKeys); @@ -133,6 +138,7 @@ public static class DynamoDbDynamicTableSinkBuilder DynamoDbWriteRequest, DynamoDbDynamicTableSinkBuilder> { private String tableName; private boolean failOnError; + private boolean ignoreNulls; private Properties dynamoDbClientProperties; private DataType physicalDataType; private Set overwriteByPartitionKeys; @@ -147,6 +153,11 @@ public DynamoDbDynamicTableSinkBuilder setFailOnError(boolean failOnError) { return this; } + public DynamoDbDynamicTableSinkBuilder setIgnoreNulls(boolean ignoreNulls) { + this.ignoreNulls = ignoreNulls; + return this; + } + public DynamoDbDynamicTableSinkBuilder setDynamoDbClientProperties( Properties dynamoDbClientProperties) { this.dynamoDbClientProperties = dynamoDbClientProperties; @@ -174,6 +185,7 @@ public AsyncDynamicTableSink build() { getMaxTimeInBufferMS(), tableName, failOnError, + ignoreNulls, dynamoDbClientProperties, physicalDataType, overwriteByPartitionKeys); diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java index 008828cc..8a1033da 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java @@ -52,6 +52,7 @@ public DynamicTableSink createDynamicTableSink(Context context) { DynamoDbDynamicSink.builder() .setTableName(dynamoDbConfiguration.getTableName()) .setFailOnError(dynamoDbConfiguration.getFailOnError()) + .setIgnoreNulls(dynamoDbConfiguration.getIgnoreNulls()) .setPhysicalDataType( catalogTable.getResolvedSchema().toPhysicalRowDataType()) .setOverwriteByPartitionKeys(new HashSet<>(catalogTable.getPartitionKeys())) diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java index d0c4c90f..d156dbcc 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java @@ -35,6 +35,7 @@ @Internal public class RowDataElementConverter implements ElementConverter { + private boolean ignoreNulls = false; private final DataType physicalDataType; private transient RowDataToAttributeValueConverter rowDataToAttributeValueConverter; @@ -44,11 +45,18 @@ public RowDataElementConverter(DataType physicalDataType) { new RowDataToAttributeValueConverter(physicalDataType); } + public RowDataElementConverter(DataType physicalDataType, boolean ignoreNulls) { + this.ignoreNulls = ignoreNulls; + this.physicalDataType = physicalDataType; + this.rowDataToAttributeValueConverter = + new RowDataToAttributeValueConverter(physicalDataType, ignoreNulls); + } + @Override public DynamoDbWriteRequest apply(RowData element, SinkWriter.Context context) { if (rowDataToAttributeValueConverter == null) { rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(physicalDataType); + new RowDataToAttributeValueConverter(physicalDataType, ignoreNulls); } DynamoDbWriteRequest.Builder builder = diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java index 1b7a05e4..dbfa66bf 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java @@ -43,14 +43,21 @@ public class RowDataToAttributeValueConverter { private final DataType physicalDataType; private final TableSchema tableSchema; + private boolean ignoreNulls = false; public RowDataToAttributeValueConverter(DataType physicalDataType) { this.physicalDataType = physicalDataType; this.tableSchema = createTableSchema(); } + public RowDataToAttributeValueConverter(DataType physicalDataType, boolean ignoreNulls) { + this.physicalDataType = physicalDataType; + this.tableSchema = createTableSchema(); + this.ignoreNulls = ignoreNulls; + } + public Map convertRowData(RowData row) { - return tableSchema.itemToMap(row, false); + return tableSchema.itemToMap(row, ignoreNulls); } private StaticTableSchema createTableSchema() { @@ -85,7 +92,7 @@ private StaticTableSchema.Builder addAttribute( rowData -> DataStructureConverters.getConverter( field.getDataType()) - .toExternal( + .toExternalOrNull( fieldGetter.getFieldOrNull( rowData))) .setter(((rowData, t) -> {})));