Skip to content

Commit

Permalink
[FLINK-36428][Connectors/DynamoDB] DynamoDb Table API Sink fails when…
Browse files Browse the repository at this point in the history
… null value in the RowData
  • Loading branch information
maoxingda committed Oct 7, 2024
1 parent b681cfc commit 841aaf7
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Properties;

import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.FAIL_ON_ERROR;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.IGNORE_NULLS;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.TABLE_NAME;

/** DynamoDb specific configuration. */
Expand All @@ -53,6 +54,10 @@ public boolean getFailOnError() {
return tableOptions.get(FAIL_ON_ERROR);
}

public boolean getIgnoreNulls() {
return tableOptions.get(IGNORE_NULLS);
}

public Properties getAsyncSinkProperties() {
return asyncSinkConfigurationValidator.getValidatedConfigurations();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ public class DynamoDbConnectorOptions {
.withDescription(
"Determines whether an exception should fail the job, otherwise failed requests are retried.");

public static final ConfigOption<Boolean> IGNORE_NULLS =
ConfigOptions.key("sink.ignore-nulls")
.booleanType()
.defaultValue(false)
.withDescription(
"Determines whether null values should be ignored in the sink. If set to true, null values are excluded from processing.");

private DynamoDbConnectorOptions() {
// private constructor to prevent initialization of static class
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class DynamoDbDynamicSink extends AsyncDynamicTableSink<DynamoDbWriteRequ

private final String tableName;
private final boolean failOnError;
private final boolean ignoreNulls;
private final Properties dynamoDbClientProperties;
private final DataType physicalDataType;
private final Set<String> overwriteByPartitionKeys;
Expand All @@ -60,6 +61,7 @@ protected DynamoDbDynamicSink(
@Nullable Long maxTimeInBufferMS,
String tableName,
boolean failOnError,
boolean ignoreNulls,
Properties dynamoDbClientProperties,
DataType physicalDataType,
Set<String> overwriteByPartitionKeys) {
Expand All @@ -71,6 +73,7 @@ protected DynamoDbDynamicSink(
maxTimeInBufferMS);
this.tableName = tableName;
this.failOnError = failOnError;
this.ignoreNulls = ignoreNulls;
this.dynamoDbClientProperties = dynamoDbClientProperties;
this.physicalDataType = physicalDataType;
this.overwriteByPartitionKeys = overwriteByPartitionKeys;
Expand All @@ -89,7 +92,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.setFailOnError(failOnError)
.setOverwriteByPartitionKeys(new ArrayList<>(overwriteByPartitionKeys))
.setDynamoDbProperties(dynamoDbClientProperties)
.setElementConverter(new RowDataElementConverter(physicalDataType));
.setElementConverter(
new RowDataElementConverter(physicalDataType, ignoreNulls));

addAsyncOptionsToSinkBuilder(builder);

Expand All @@ -106,6 +110,7 @@ public DynamicTableSink copy() {
maxTimeInBufferMS,
tableName,
failOnError,
ignoreNulls,
dynamoDbClientProperties,
physicalDataType,
overwriteByPartitionKeys);
Expand Down Expand Up @@ -133,6 +138,7 @@ public static class DynamoDbDynamicTableSinkBuilder
DynamoDbWriteRequest, DynamoDbDynamicTableSinkBuilder> {
private String tableName;
private boolean failOnError;
private boolean ignoreNulls;
private Properties dynamoDbClientProperties;
private DataType physicalDataType;
private Set<String> overwriteByPartitionKeys;
Expand All @@ -147,6 +153,11 @@ public DynamoDbDynamicTableSinkBuilder setFailOnError(boolean failOnError) {
return this;
}

public DynamoDbDynamicTableSinkBuilder setIgnoreNulls(boolean ignoreNulls) {
this.ignoreNulls = ignoreNulls;
return this;
}

public DynamoDbDynamicTableSinkBuilder setDynamoDbClientProperties(
Properties dynamoDbClientProperties) {
this.dynamoDbClientProperties = dynamoDbClientProperties;
Expand Down Expand Up @@ -174,6 +185,7 @@ public AsyncDynamicTableSink<DynamoDbWriteRequest> build() {
getMaxTimeInBufferMS(),
tableName,
failOnError,
ignoreNulls,
dynamoDbClientProperties,
physicalDataType,
overwriteByPartitionKeys);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
DynamoDbDynamicSink.builder()
.setTableName(dynamoDbConfiguration.getTableName())
.setFailOnError(dynamoDbConfiguration.getFailOnError())
.setIgnoreNulls(dynamoDbConfiguration.getIgnoreNulls())
.setPhysicalDataType(
catalogTable.getResolvedSchema().toPhysicalRowDataType())
.setOverwriteByPartitionKeys(new HashSet<>(catalogTable.getPartitionKeys()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
@Internal
public class RowDataElementConverter implements ElementConverter<RowData, DynamoDbWriteRequest> {

private boolean ignoreNulls = false;
private final DataType physicalDataType;
private transient RowDataToAttributeValueConverter rowDataToAttributeValueConverter;

Expand All @@ -44,11 +45,18 @@ public RowDataElementConverter(DataType physicalDataType) {
new RowDataToAttributeValueConverter(physicalDataType);
}

public RowDataElementConverter(DataType physicalDataType, boolean ignoreNulls) {
this.ignoreNulls = ignoreNulls;
this.physicalDataType = physicalDataType;
this.rowDataToAttributeValueConverter =
new RowDataToAttributeValueConverter(physicalDataType, ignoreNulls);
}

@Override
public DynamoDbWriteRequest apply(RowData element, SinkWriter.Context context) {
if (rowDataToAttributeValueConverter == null) {
rowDataToAttributeValueConverter =
new RowDataToAttributeValueConverter(physicalDataType);
new RowDataToAttributeValueConverter(physicalDataType, ignoreNulls);
}

DynamoDbWriteRequest.Builder builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,21 @@ public class RowDataToAttributeValueConverter {

private final DataType physicalDataType;
private final TableSchema<RowData> tableSchema;
private boolean ignoreNulls = false;

public RowDataToAttributeValueConverter(DataType physicalDataType) {
this.physicalDataType = physicalDataType;
this.tableSchema = createTableSchema();
}

public RowDataToAttributeValueConverter(DataType physicalDataType, boolean ignoreNulls) {
this.physicalDataType = physicalDataType;
this.tableSchema = createTableSchema();
this.ignoreNulls = ignoreNulls;
}

public Map<String, AttributeValue> convertRowData(RowData row) {
return tableSchema.itemToMap(row, false);
return tableSchema.itemToMap(row, ignoreNulls);
}

private StaticTableSchema<RowData> createTableSchema() {
Expand Down Expand Up @@ -85,7 +92,7 @@ private StaticTableSchema.Builder<RowData> addAttribute(
rowData ->
DataStructureConverters.getConverter(
field.getDataType())
.toExternal(
.toExternalOrNull(
fieldGetter.getFieldOrNull(
rowData)))
.setter(((rowData, t) -> {})));
Expand Down

0 comments on commit 841aaf7

Please sign in to comment.