Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36428][Connectors/DynamoDB] DynamoDb Table API Sink fails when null value in the RowData #173

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/table/dynamodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,13 @@ Connector Options
<td>Boolean</td>
<td>Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job.</td>
</tr>
<tr>
<td><h5>sink.ignore-nulls</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Determines whether null values should be ignored in the sink. If set to true, null values are excluded from processing.</td>
</tr>
</tbody>
<thead>
<tr>
Expand Down
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/table/dynamodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,13 @@ Connector Options
<td>Boolean</td>
<td>Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job.</td>
</tr>
<tr>
<td><h5>sink.ignore-nulls</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Determines whether null values should be ignored in the sink. If set to true, null values are excluded from processing.</td>
</tr>
</tbody>
<thead>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Properties;

import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.FAIL_ON_ERROR;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.IGNORE_NULLS;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.TABLE_NAME;

/** DynamoDb specific configuration. */
Expand All @@ -53,6 +54,10 @@ public boolean getFailOnError() {
return tableOptions.get(FAIL_ON_ERROR);
}

public boolean getIgnoreNulls() {
return tableOptions.get(IGNORE_NULLS);
}

public Properties getAsyncSinkProperties() {
return asyncSinkConfigurationValidator.getValidatedConfigurations();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ public class DynamoDbConnectorOptions {
.withDescription(
"Determines whether an exception should fail the job, otherwise failed requests are retried.");

public static final ConfigOption<Boolean> IGNORE_NULLS =
ConfigOptions.key("sink.ignore-nulls")
.booleanType()
.defaultValue(false)
.withDescription(
"Determines whether null values should be ignored in the sink. If set to true, null values are excluded from processing.");

private DynamoDbConnectorOptions() {
// private constructor to prevent initialization of static class
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class DynamoDbDynamicSink extends AsyncDynamicTableSink<DynamoDbWriteRequ

private final String tableName;
private final boolean failOnError;
private final boolean ignoreNulls;
private final Properties dynamoDbClientProperties;
private final DataType physicalDataType;
private final Set<String> overwriteByPartitionKeys;
Expand All @@ -60,6 +61,7 @@ protected DynamoDbDynamicSink(
@Nullable Long maxTimeInBufferMS,
String tableName,
boolean failOnError,
boolean ignoreNulls,
Properties dynamoDbClientProperties,
DataType physicalDataType,
Set<String> overwriteByPartitionKeys) {
Expand All @@ -71,6 +73,7 @@ protected DynamoDbDynamicSink(
maxTimeInBufferMS);
this.tableName = tableName;
this.failOnError = failOnError;
this.ignoreNulls = ignoreNulls;
this.dynamoDbClientProperties = dynamoDbClientProperties;
this.physicalDataType = physicalDataType;
this.overwriteByPartitionKeys = overwriteByPartitionKeys;
Expand All @@ -89,7 +92,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.setFailOnError(failOnError)
.setOverwriteByPartitionKeys(new ArrayList<>(overwriteByPartitionKeys))
.setDynamoDbProperties(dynamoDbClientProperties)
.setElementConverter(new RowDataElementConverter(physicalDataType));
.setElementConverter(
new RowDataElementConverter(physicalDataType, ignoreNulls));

addAsyncOptionsToSinkBuilder(builder);

Expand All @@ -106,6 +110,7 @@ public DynamicTableSink copy() {
maxTimeInBufferMS,
tableName,
failOnError,
ignoreNulls,
dynamoDbClientProperties,
physicalDataType,
overwriteByPartitionKeys);
Expand Down Expand Up @@ -133,6 +138,7 @@ public static class DynamoDbDynamicTableSinkBuilder
DynamoDbWriteRequest, DynamoDbDynamicTableSinkBuilder> {
private String tableName;
private boolean failOnError;
private boolean ignoreNulls;
private Properties dynamoDbClientProperties;
private DataType physicalDataType;
private Set<String> overwriteByPartitionKeys;
Expand All @@ -147,6 +153,11 @@ public DynamoDbDynamicTableSinkBuilder setFailOnError(boolean failOnError) {
return this;
}

public DynamoDbDynamicTableSinkBuilder setIgnoreNulls(boolean ignoreNulls) {
this.ignoreNulls = ignoreNulls;
return this;
}

public DynamoDbDynamicTableSinkBuilder setDynamoDbClientProperties(
Properties dynamoDbClientProperties) {
this.dynamoDbClientProperties = dynamoDbClientProperties;
Expand Down Expand Up @@ -174,6 +185,7 @@ public AsyncDynamicTableSink<DynamoDbWriteRequest> build() {
getMaxTimeInBufferMS(),
tableName,
failOnError,
ignoreNulls,
dynamoDbClientProperties,
physicalDataType,
overwriteByPartitionKeys);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
DynamoDbDynamicSink.builder()
.setTableName(dynamoDbConfiguration.getTableName())
.setFailOnError(dynamoDbConfiguration.getFailOnError())
.setIgnoreNulls(dynamoDbConfiguration.getIgnoreNulls())
.setPhysicalDataType(
catalogTable.getResolvedSchema().toPhysicalRowDataType())
.setOverwriteByPartitionKeys(new HashSet<>(catalogTable.getPartitionKeys()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
@Internal
public class RowDataElementConverter implements ElementConverter<RowData, DynamoDbWriteRequest> {

private boolean ignoreNulls = false;
private final DataType physicalDataType;
private transient RowDataToAttributeValueConverter rowDataToAttributeValueConverter;

Expand All @@ -44,11 +45,18 @@ public RowDataElementConverter(DataType physicalDataType) {
new RowDataToAttributeValueConverter(physicalDataType);
}

public RowDataElementConverter(DataType physicalDataType, boolean ignoreNulls) {
this.ignoreNulls = ignoreNulls;
this.physicalDataType = physicalDataType;
this.rowDataToAttributeValueConverter =
new RowDataToAttributeValueConverter(physicalDataType, ignoreNulls);
}

@Override
public DynamoDbWriteRequest apply(RowData element, SinkWriter.Context context) {
if (rowDataToAttributeValueConverter == null) {
rowDataToAttributeValueConverter =
new RowDataToAttributeValueConverter(physicalDataType);
new RowDataToAttributeValueConverter(physicalDataType, ignoreNulls);
}

DynamoDbWriteRequest.Builder builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,21 @@ public class RowDataToAttributeValueConverter {

private final DataType physicalDataType;
private final TableSchema<RowData> tableSchema;
private boolean ignoreNulls = false;

public RowDataToAttributeValueConverter(DataType physicalDataType) {
this.physicalDataType = physicalDataType;
this.tableSchema = createTableSchema();
}

public RowDataToAttributeValueConverter(DataType physicalDataType, boolean ignoreNulls) {
this.physicalDataType = physicalDataType;
this.tableSchema = createTableSchema();
this.ignoreNulls = ignoreNulls;
}

public Map<String, AttributeValue> convertRowData(RowData row) {
return tableSchema.itemToMap(row, false);
return tableSchema.itemToMap(row, ignoreNulls);
}

private StaticTableSchema<RowData> createTableSchema() {
Expand Down Expand Up @@ -85,7 +92,7 @@ private StaticTableSchema.Builder<RowData> addAttribute(
rowData ->
DataStructureConverters.getConverter(
field.getDataType())
.toExternal(
.toExternalOrNull(
fieldGetter.getFieldOrNull(
rowData)))
.setter(((rowData, t) -> {})));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MAX_IN_FLIGHT_REQUESTS;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.AWS_REGION;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.FAIL_ON_ERROR;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.IGNORE_NULLS;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.TABLE_NAME;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -137,7 +138,10 @@ void testCopyTableSink() {
void testGoodTableSinkWithOptionalOptions() {
ResolvedSchema sinkSchema = defaultSinkSchema();
Map<String, String> sinkOptions =
defaultSinkOptions().withTableOption(FAIL_ON_ERROR, "true").build();
defaultSinkOptions()
.withTableOption(FAIL_ON_ERROR, "true")
.withTableOption(IGNORE_NULLS, "true")
.build();
List<String> partitionKeys = Collections.singletonList("partition_key");

// Construct actual sink
Expand All @@ -153,6 +157,7 @@ void testGoodTableSinkWithOptionalOptions() {
.setDynamoDbClientProperties(defaultSinkProperties())
.setPhysicalDataType(sinkSchema.toPhysicalRowDataType())
.setFailOnError(true)
.setIgnoreNulls(true)
.build();

assertThat(actualSink).usingRecursiveComparison().isEqualTo(expectedSink);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ void testChar() {
assertThat(actualResult).containsAllEntriesOf(expectedResult);
}

@Test
void testCharNull() {
String key = "key";

DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.CHAR(9)));
RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
new RowDataToAttributeValueConverter(dataType, true);
Map<String, AttributeValue> actualResult =
rowDataToAttributeValueConverter.convertRowData(createElement(null));

assertThat(actualResult.isEmpty()).isEqualTo(true);
}

@Test
void testVarChar() {
String key = "key";
Expand Down
Loading