Skip to content

Commit

Permalink
[FLINK-36428][Connectors/DynamoDB] DynamoDb Table API Sink fails when…
Browse files Browse the repository at this point in the history
… null value in the RowData
  • Loading branch information
maoxingda committed Oct 14, 2024
1 parent b681cfc commit 86781b1
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 5 deletions.
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/table/dynamodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,13 @@ Connector Options
<td>Boolean</td>
<td>Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job.</td>
</tr>
<tr>
<td><h5>sink.ignore-nulls</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Determines whether null values should be ignored in the sink. If set to true, null values are excluded from processing.</td>
</tr>
</tbody>
<thead>
<tr>
Expand Down
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/table/dynamodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,13 @@ Connector Options
<td>Boolean</td>
<td>Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job.</td>
</tr>
<tr>
<td><h5>sink.ignore-nulls</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Determines whether null values should be ignored in the sink. If set to true, null values are excluded from processing.</td>
</tr>
</tbody>
<thead>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Properties;

import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.FAIL_ON_ERROR;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.IGNORE_NULLS;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.TABLE_NAME;

/** DynamoDb specific configuration. */
Expand All @@ -53,6 +54,10 @@ public boolean getFailOnError() {
return tableOptions.get(FAIL_ON_ERROR);
}

public boolean getIgnoreNulls() {
return tableOptions.get(IGNORE_NULLS);
}

public Properties getAsyncSinkProperties() {
return asyncSinkConfigurationValidator.getValidatedConfigurations();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ public class DynamoDbConnectorOptions {
.withDescription(
"Determines whether an exception should fail the job, otherwise failed requests are retried.");

public static final ConfigOption<Boolean> IGNORE_NULLS =
ConfigOptions.key("sink.ignore-nulls")
.booleanType()
.defaultValue(false)
.withDescription(
"Determines whether null values should be ignored in the sink. If set to true, null values are excluded from processing.");

private DynamoDbConnectorOptions() {
// private constructor to prevent initialization of static class
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class DynamoDbDynamicSink extends AsyncDynamicTableSink<DynamoDbWriteRequ

private final String tableName;
private final boolean failOnError;
private final boolean ignoreNulls;
private final Properties dynamoDbClientProperties;
private final DataType physicalDataType;
private final Set<String> overwriteByPartitionKeys;
Expand All @@ -60,6 +61,7 @@ protected DynamoDbDynamicSink(
@Nullable Long maxTimeInBufferMS,
String tableName,
boolean failOnError,
boolean ignoreNulls,
Properties dynamoDbClientProperties,
DataType physicalDataType,
Set<String> overwriteByPartitionKeys) {
Expand All @@ -71,6 +73,7 @@ protected DynamoDbDynamicSink(
maxTimeInBufferMS);
this.tableName = tableName;
this.failOnError = failOnError;
this.ignoreNulls = ignoreNulls;
this.dynamoDbClientProperties = dynamoDbClientProperties;
this.physicalDataType = physicalDataType;
this.overwriteByPartitionKeys = overwriteByPartitionKeys;
Expand All @@ -89,7 +92,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.setFailOnError(failOnError)
.setOverwriteByPartitionKeys(new ArrayList<>(overwriteByPartitionKeys))
.setDynamoDbProperties(dynamoDbClientProperties)
.setElementConverter(new RowDataElementConverter(physicalDataType));
.setElementConverter(
new RowDataElementConverter(physicalDataType, ignoreNulls));

addAsyncOptionsToSinkBuilder(builder);

Expand All @@ -106,6 +110,7 @@ public DynamicTableSink copy() {
maxTimeInBufferMS,
tableName,
failOnError,
ignoreNulls,
dynamoDbClientProperties,
physicalDataType,
overwriteByPartitionKeys);
Expand Down Expand Up @@ -133,6 +138,7 @@ public static class DynamoDbDynamicTableSinkBuilder
DynamoDbWriteRequest, DynamoDbDynamicTableSinkBuilder> {
private String tableName;
private boolean failOnError;
private boolean ignoreNulls;
private Properties dynamoDbClientProperties;
private DataType physicalDataType;
private Set<String> overwriteByPartitionKeys;
Expand All @@ -147,6 +153,11 @@ public DynamoDbDynamicTableSinkBuilder setFailOnError(boolean failOnError) {
return this;
}

public DynamoDbDynamicTableSinkBuilder setIgnoreNulls(boolean ignoreNulls) {
this.ignoreNulls = ignoreNulls;
return this;
}

public DynamoDbDynamicTableSinkBuilder setDynamoDbClientProperties(
Properties dynamoDbClientProperties) {
this.dynamoDbClientProperties = dynamoDbClientProperties;
Expand Down Expand Up @@ -174,6 +185,7 @@ public AsyncDynamicTableSink<DynamoDbWriteRequest> build() {
getMaxTimeInBufferMS(),
tableName,
failOnError,
ignoreNulls,
dynamoDbClientProperties,
physicalDataType,
overwriteByPartitionKeys);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
DynamoDbDynamicSink.builder()
.setTableName(dynamoDbConfiguration.getTableName())
.setFailOnError(dynamoDbConfiguration.getFailOnError())
.setIgnoreNulls(dynamoDbConfiguration.getIgnoreNulls())
.setPhysicalDataType(
catalogTable.getResolvedSchema().toPhysicalRowDataType())
.setOverwriteByPartitionKeys(new HashSet<>(catalogTable.getPartitionKeys()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
@Internal
public class RowDataElementConverter implements ElementConverter<RowData, DynamoDbWriteRequest> {

private boolean ignoreNulls = false;
private final DataType physicalDataType;
private transient RowDataToAttributeValueConverter rowDataToAttributeValueConverter;

Expand All @@ -44,11 +45,18 @@ public RowDataElementConverter(DataType physicalDataType) {
new RowDataToAttributeValueConverter(physicalDataType);
}

public RowDataElementConverter(DataType physicalDataType, boolean ignoreNulls) {
this.ignoreNulls = ignoreNulls;
this.physicalDataType = physicalDataType;
this.rowDataToAttributeValueConverter =
new RowDataToAttributeValueConverter(physicalDataType, ignoreNulls);
}

@Override
public DynamoDbWriteRequest apply(RowData element, SinkWriter.Context context) {
if (rowDataToAttributeValueConverter == null) {
rowDataToAttributeValueConverter =
new RowDataToAttributeValueConverter(physicalDataType);
new RowDataToAttributeValueConverter(physicalDataType, ignoreNulls);
}

DynamoDbWriteRequest.Builder builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,21 @@ public class RowDataToAttributeValueConverter {

private final DataType physicalDataType;
private final TableSchema<RowData> tableSchema;
private boolean ignoreNulls = false;

public RowDataToAttributeValueConverter(DataType physicalDataType) {
this.physicalDataType = physicalDataType;
this.tableSchema = createTableSchema();
}

public RowDataToAttributeValueConverter(DataType physicalDataType, boolean ignoreNulls) {
this.physicalDataType = physicalDataType;
this.tableSchema = createTableSchema();
this.ignoreNulls = ignoreNulls;
}

public Map<String, AttributeValue> convertRowData(RowData row) {
return tableSchema.itemToMap(row, false);
return tableSchema.itemToMap(row, ignoreNulls);
}

private StaticTableSchema<RowData> createTableSchema() {
Expand Down Expand Up @@ -85,7 +92,7 @@ private StaticTableSchema.Builder<RowData> addAttribute(
rowData ->
DataStructureConverters.getConverter(
field.getDataType())
.toExternal(
.toExternalOrNull(
fieldGetter.getFieldOrNull(
rowData)))
.setter(((rowData, t) -> {})));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MAX_IN_FLIGHT_REQUESTS;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.AWS_REGION;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.FAIL_ON_ERROR;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.IGNORE_NULLS;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.TABLE_NAME;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -137,7 +138,10 @@ void testCopyTableSink() {
void testGoodTableSinkWithOptionalOptions() {
ResolvedSchema sinkSchema = defaultSinkSchema();
Map<String, String> sinkOptions =
defaultSinkOptions().withTableOption(FAIL_ON_ERROR, "true").build();
defaultSinkOptions()
.withTableOption(FAIL_ON_ERROR, "true")
.withTableOption(IGNORE_NULLS, "true")
.build();
List<String> partitionKeys = Collections.singletonList("partition_key");

// Construct actual sink
Expand All @@ -153,6 +157,7 @@ void testGoodTableSinkWithOptionalOptions() {
.setDynamoDbClientProperties(defaultSinkProperties())
.setPhysicalDataType(sinkSchema.toPhysicalRowDataType())
.setFailOnError(true)
.setIgnoreNulls(true)
.build();

assertThat(actualSink).usingRecursiveComparison().isEqualTo(expectedSink);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ void testChar() {
assertThat(actualResult).containsAllEntriesOf(expectedResult);
}

@Test
void testCharNull() {
String key = "key";

DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.CHAR(9)));
RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
new RowDataToAttributeValueConverter(dataType, true);
Map<String, AttributeValue> actualResult =
rowDataToAttributeValueConverter.convertRowData(createElement(null));

assertThat(actualResult.isEmpty()).isEqualTo(true);
}

@Test
void testVarChar() {
String key = "key";
Expand Down

0 comments on commit 86781b1

Please sign in to comment.