diff --git a/docs/content.zh/docs/connectors/table/dynamodb.md b/docs/content.zh/docs/connectors/table/dynamodb.md
index ada2a463..1522a39a 100644
--- a/docs/content.zh/docs/connectors/table/dynamodb.md
+++ b/docs/content.zh/docs/connectors/table/dynamodb.md
@@ -236,6 +236,13 @@ Connector Options
Boolean |
Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job. |
+
+ sink.ignore-nulls |
+ optional |
+ false |
+ Boolean |
+ Determines whether null values should be ignored in the sink. If set to true, null values are excluded from processing. |
+
diff --git a/docs/content/docs/connectors/table/dynamodb.md b/docs/content/docs/connectors/table/dynamodb.md
index e4952b22..66fd1078 100644
--- a/docs/content/docs/connectors/table/dynamodb.md
+++ b/docs/content/docs/connectors/table/dynamodb.md
@@ -236,6 +236,13 @@ Connector Options
Boolean |
Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job. |
+
+ sink.ignore-nulls |
+ optional |
+ false |
+ Boolean |
+ Determines whether null values should be ignored in the sink. If set to true, null values are excluded from processing. |
+
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java
index b3fe27df..d3777f0b 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java
@@ -27,6 +27,7 @@
import java.util.Properties;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.FAIL_ON_ERROR;
+import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.IGNORE_NULLS;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.TABLE_NAME;
/** DynamoDb specific configuration. */
@@ -53,6 +54,10 @@ public boolean getFailOnError() {
return tableOptions.get(FAIL_ON_ERROR);
}
+ public boolean getIgnoreNulls() {
+ return tableOptions.get(IGNORE_NULLS);
+ }
+
public Properties getAsyncSinkProperties() {
return asyncSinkConfigurationValidator.getValidatedConfigurations();
}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConnectorOptions.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConnectorOptions.java
index 899b6905..376138c5 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConnectorOptions.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConnectorOptions.java
@@ -46,6 +46,13 @@ public class DynamoDbConnectorOptions {
.withDescription(
"Determines whether an exception should fail the job, otherwise failed requests are retried.");
+ public static final ConfigOption IGNORE_NULLS =
+ ConfigOptions.key("sink.ignore-nulls")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Determines whether null values should be ignored in the sink. If set to true, null values are excluded from processing.");
+
private DynamoDbConnectorOptions() {
// private constructor to prevent initialization of static class
}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java
index d79cd786..8091306f 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java
@@ -48,6 +48,7 @@ public class DynamoDbDynamicSink extends AsyncDynamicTableSink overwriteByPartitionKeys;
@@ -60,6 +61,7 @@ protected DynamoDbDynamicSink(
@Nullable Long maxTimeInBufferMS,
String tableName,
boolean failOnError,
+ boolean ignoreNulls,
Properties dynamoDbClientProperties,
DataType physicalDataType,
Set overwriteByPartitionKeys) {
@@ -71,6 +73,7 @@ protected DynamoDbDynamicSink(
maxTimeInBufferMS);
this.tableName = tableName;
this.failOnError = failOnError;
+ this.ignoreNulls = ignoreNulls;
this.dynamoDbClientProperties = dynamoDbClientProperties;
this.physicalDataType = physicalDataType;
this.overwriteByPartitionKeys = overwriteByPartitionKeys;
@@ -89,7 +92,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.setFailOnError(failOnError)
.setOverwriteByPartitionKeys(new ArrayList<>(overwriteByPartitionKeys))
.setDynamoDbProperties(dynamoDbClientProperties)
- .setElementConverter(new RowDataElementConverter(physicalDataType));
+ .setElementConverter(
+ new RowDataElementConverter(physicalDataType, ignoreNulls));
addAsyncOptionsToSinkBuilder(builder);
@@ -106,6 +110,7 @@ public DynamicTableSink copy() {
maxTimeInBufferMS,
tableName,
failOnError,
+ ignoreNulls,
dynamoDbClientProperties,
physicalDataType,
overwriteByPartitionKeys);
@@ -133,6 +138,7 @@ public static class DynamoDbDynamicTableSinkBuilder
DynamoDbWriteRequest, DynamoDbDynamicTableSinkBuilder> {
private String tableName;
private boolean failOnError;
+ private boolean ignoreNulls;
private Properties dynamoDbClientProperties;
private DataType physicalDataType;
private Set overwriteByPartitionKeys;
@@ -147,6 +153,11 @@ public DynamoDbDynamicTableSinkBuilder setFailOnError(boolean failOnError) {
return this;
}
+ public DynamoDbDynamicTableSinkBuilder setIgnoreNulls(boolean ignoreNulls) {
+ this.ignoreNulls = ignoreNulls;
+ return this;
+ }
+
public DynamoDbDynamicTableSinkBuilder setDynamoDbClientProperties(
Properties dynamoDbClientProperties) {
this.dynamoDbClientProperties = dynamoDbClientProperties;
@@ -174,6 +185,7 @@ public AsyncDynamicTableSink build() {
getMaxTimeInBufferMS(),
tableName,
failOnError,
+ ignoreNulls,
dynamoDbClientProperties,
physicalDataType,
overwriteByPartitionKeys);
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java
index 008828cc..8a1033da 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java
@@ -52,6 +52,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
DynamoDbDynamicSink.builder()
.setTableName(dynamoDbConfiguration.getTableName())
.setFailOnError(dynamoDbConfiguration.getFailOnError())
+ .setIgnoreNulls(dynamoDbConfiguration.getIgnoreNulls())
.setPhysicalDataType(
catalogTable.getResolvedSchema().toPhysicalRowDataType())
.setOverwriteByPartitionKeys(new HashSet<>(catalogTable.getPartitionKeys()))
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java
index d0c4c90f..d156dbcc 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java
@@ -35,6 +35,7 @@
@Internal
public class RowDataElementConverter implements ElementConverter {
+ private boolean ignoreNulls = false;
private final DataType physicalDataType;
private transient RowDataToAttributeValueConverter rowDataToAttributeValueConverter;
@@ -44,11 +45,18 @@ public RowDataElementConverter(DataType physicalDataType) {
new RowDataToAttributeValueConverter(physicalDataType);
}
+ public RowDataElementConverter(DataType physicalDataType, boolean ignoreNulls) {
+ this.ignoreNulls = ignoreNulls;
+ this.physicalDataType = physicalDataType;
+ this.rowDataToAttributeValueConverter =
+ new RowDataToAttributeValueConverter(physicalDataType, ignoreNulls);
+ }
+
@Override
public DynamoDbWriteRequest apply(RowData element, SinkWriter.Context context) {
if (rowDataToAttributeValueConverter == null) {
rowDataToAttributeValueConverter =
- new RowDataToAttributeValueConverter(physicalDataType);
+ new RowDataToAttributeValueConverter(physicalDataType, ignoreNulls);
}
DynamoDbWriteRequest.Builder builder =
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java
index 1b7a05e4..dbfa66bf 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java
@@ -43,14 +43,21 @@ public class RowDataToAttributeValueConverter {
private final DataType physicalDataType;
private final TableSchema tableSchema;
+ private boolean ignoreNulls = false;
public RowDataToAttributeValueConverter(DataType physicalDataType) {
this.physicalDataType = physicalDataType;
this.tableSchema = createTableSchema();
}
+ public RowDataToAttributeValueConverter(DataType physicalDataType, boolean ignoreNulls) {
+ this.physicalDataType = physicalDataType;
+ this.tableSchema = createTableSchema();
+ this.ignoreNulls = ignoreNulls;
+ }
+
public Map convertRowData(RowData row) {
- return tableSchema.itemToMap(row, false);
+ return tableSchema.itemToMap(row, ignoreNulls);
}
private StaticTableSchema createTableSchema() {
@@ -85,7 +92,7 @@ private StaticTableSchema.Builder addAttribute(
rowData ->
DataStructureConverters.getConverter(
field.getDataType())
- .toExternal(
+ .toExternalOrNull(
fieldGetter.getFieldOrNull(
rowData)))
.setter(((rowData, t) -> {})));
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
index 0bbe1329..862ab421 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
@@ -47,6 +47,7 @@
import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MAX_IN_FLIGHT_REQUESTS;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.AWS_REGION;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.FAIL_ON_ERROR;
+import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.IGNORE_NULLS;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.TABLE_NAME;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static org.assertj.core.api.Assertions.assertThat;
@@ -137,7 +138,10 @@ void testCopyTableSink() {
void testGoodTableSinkWithOptionalOptions() {
ResolvedSchema sinkSchema = defaultSinkSchema();
Map sinkOptions =
- defaultSinkOptions().withTableOption(FAIL_ON_ERROR, "true").build();
+ defaultSinkOptions()
+ .withTableOption(FAIL_ON_ERROR, "true")
+ .withTableOption(IGNORE_NULLS, "true")
+ .build();
List partitionKeys = Collections.singletonList("partition_key");
// Construct actual sink
@@ -153,6 +157,7 @@ void testGoodTableSinkWithOptionalOptions() {
.setDynamoDbClientProperties(defaultSinkProperties())
.setPhysicalDataType(sinkSchema.toPhysicalRowDataType())
.setFailOnError(true)
+ .setIgnoreNulls(true)
.build();
assertThat(actualSink).usingRecursiveComparison().isEqualTo(expectedSink);
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java
index 5566ae10..612aa9a7 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java
@@ -61,6 +61,19 @@ void testChar() {
assertThat(actualResult).containsAllEntriesOf(expectedResult);
}
+ @Test
+ void testCharNull() {
+ String key = "key";
+
+ DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.CHAR(9)));
+ RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+ new RowDataToAttributeValueConverter(dataType, true);
+ Map actualResult =
+ rowDataToAttributeValueConverter.convertRowData(createElement(null));
+
+ assertThat(actualResult.isEmpty()).isEqualTo(true);
+ }
+
@Test
void testVarChar() {
String key = "key";