Skip to content

Commit

Permalink
Convert Number types to BigDecimal plainString for consistency betwee…
Browse files Browse the repository at this point in the history
…n partition and sort keys for export and streams (opensearch-project#3650)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Nov 14, 2023
1 parent 3322c8a commit 5b5ddae
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;

import java.math.BigDecimal;
import java.time.Instant;
import java.util.Map;

Expand Down Expand Up @@ -56,7 +57,11 @@ public RecordConverter(final BufferAccumulator<Record<Event>> bufferAccumulator,
*/
private String getAttributeValue(final Map<String, Object> data, String attributeName) {
if (data.containsKey(attributeName)) {
return String.valueOf(data.get(attributeName));
final Object value = data.get(attributeName);
if (value instanceof Number) {
return new BigDecimal(value.toString()).toPlainString();
}
return String.valueOf(value);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
Expand All @@ -26,6 +29,7 @@
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Stream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -44,9 +48,9 @@
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSED_COUNT;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSING_ERROR_COUNT;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.SORT_KEY_METADATA_ATTRIBUTE;
Expand Down Expand Up @@ -164,4 +168,41 @@ void test_writeSingleRecordToBuffer() throws Exception {
verify(bytesReceivedSummary, times(1)).record(line.getBytes().length);
verify(bytesProcessedSummary, times(1)).record(line.getBytes().length);
}

@ParameterizedTest
@MethodSource("decimalFormatKeysArgumentProvider")
void writing_record_to_buffer_with_ion_formatted_decimals_creates_expected_partition_and_sort_key(
final String partitionKey,
final String sortKey,
final String expectedPartitionKey,
final String expectedSortKey
) throws Exception {

final ArgumentCaptor<Record> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class);
final String line = " $ion_1_0 {Item:{PK:" + partitionKey + ",SK:" + sortKey + "}}";

ExportRecordConverter objectUnderTest = new ExportRecordConverter(bufferAccumulator, tableInfo, pluginMetrics);
doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture());

objectUnderTest.writeToBuffer(eq(null), List.of(line));
verify(bufferAccumulator).add(any(Record.class));
verify(bufferAccumulator).flush();
assertThat(recordArgumentCaptor.getValue().getData(), notNullValue());
JacksonEvent event = (JacksonEvent) recordArgumentCaptor.getValue().getData();

assertThat(event.getMetadata(), notNullValue());

assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(expectedPartitionKey));
assertThat(event.getMetadata().getAttribute(SORT_KEY_METADATA_ATTRIBUTE), equalTo(expectedSortKey));
assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(expectedPartitionKey + "|" + expectedSortKey));
}

private static Stream<Arguments> decimalFormatKeysArgumentProvider() {
return Stream.of(
Arguments.of("86067d1", "39.29", "860670", "39.29"),
Arguments.of("212d9", "0d0", "212000000000", "0"),
Arguments.of("0.", "4.2d1", "0", "42"),
Arguments.of("0.420d2", "42d0", "42.0", "42")
);
}
}

0 comments on commit 5b5ddae

Please sign in to comment.