Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MAINT: add bytes metrics into dynamo source #3647

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.dataformat.ion.IonObjectMapper;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
Expand All @@ -28,6 +29,8 @@ public class ExportRecordConverter extends RecordConverter {

static final String EXPORT_RECORDS_PROCESSED_COUNT = "exportRecordsProcessed";
static final String EXPORT_RECORDS_PROCESSING_ERROR_COUNT = "exportRecordProcessingErrors";
static final String BYTES_RECEIVED = "bytesReceived";
static final String BYTES_PROCESSED = "bytesProcessed";


IonObjectMapper MAPPER = new IonObjectMapper();
Expand All @@ -36,13 +39,16 @@ public class ExportRecordConverter extends RecordConverter {

private final Counter exportRecordSuccessCounter;
private final Counter exportRecordErrorCounter;
private final DistributionSummary bytesReceivedSummary;
private final DistributionSummary bytesProcessedSummary;

public ExportRecordConverter(final BufferAccumulator<Record<Event>> bufferAccumulator, TableInfo tableInfo, PluginMetrics pluginMetrics) {
super(bufferAccumulator, tableInfo);
this.pluginMetrics = pluginMetrics;
this.exportRecordSuccessCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT);
this.exportRecordErrorCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSING_ERROR_COUNT);

this.bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED);
this.bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED);
}

private Map<String, Object> convertToMap(String jsonData) {
Expand All @@ -63,9 +69,12 @@ public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List<Stri

int eventCount = 0;
for (String line : lines) {
final long bytes = line.getBytes().length;
bytesReceivedSummary.record(bytes);
Map data = (Map<String, Object>) convertToMap(line).get(ITEM_KEY);
try {
addToBuffer(acknowledgementSet, data);
bytesProcessedSummary.record(bytes);
eventCount++;
} catch (Exception e) {
// will this cause too many logs?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
Expand All @@ -31,6 +32,8 @@ public class StreamRecordConverter extends RecordConverter {

static final String CHANGE_EVENTS_PROCESSED_COUNT = "changeEventsProcessed";
static final String CHANGE_EVENTS_PROCESSING_ERROR_COUNT = "changeEventsProcessingErrors";
static final String BYTES_RECEIVED = "bytesReceived";
static final String BYTES_PROCESSED = "bytesProcessed";

private static final ObjectMapper MAPPER = new ObjectMapper();

Expand All @@ -41,6 +44,8 @@ public class StreamRecordConverter extends RecordConverter {

private final Counter changeEventSuccessCounter;
private final Counter changeEventErrorCounter;
private final DistributionSummary bytesReceivedSummary;
private final DistributionSummary bytesProcessedSummary;

private Instant currentSecond;
private int recordsSeenThisSecond = 0;
Expand All @@ -50,6 +55,8 @@ public StreamRecordConverter(final BufferAccumulator<org.opensearch.dataprepper.
this.pluginMetrics = pluginMetrics;
this.changeEventSuccessCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSED_COUNT);
this.changeEventErrorCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSING_ERROR_COUNT);
this.bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED);
this.bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED);
}

@Override
Expand All @@ -62,14 +69,17 @@ public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List<Reco

int eventCount = 0;
for (Record record : records) {
final long bytes = record.dynamodb().sizeBytes();
// NewImage may be empty
Map<String, Object> data = convertData(record.dynamodb().newImage());
// Always get keys from dynamodb().keys()
Map<String, Object> keys = convertKeys(record.dynamodb().keys());

try {
bytesReceivedSummary.record(bytes);
final long eventCreationTimeMillis = calculateTieBreakingVersionFromTimestamp(record.dynamodb().approximateCreationDateTime());
addToBuffer(acknowledgementSet, data, keys, record.dynamodb().approximateCreationDateTime().toEpochMilli(), eventCreationTimeMillis, record.eventNameAsString());
bytesProcessedSummary.record(bytes);
eventCount++;
} catch (Exception e) {
// will this cause too many logs?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.source.dynamodb.converter;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -38,6 +39,8 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.BYTES_PROCESSED;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.BYTES_RECEIVED;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSED_COUNT;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSING_ERROR_COUNT;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE;
Expand Down Expand Up @@ -65,6 +68,12 @@ class ExportRecordConverterTest {
@Mock
private Counter exportRecordErrors;

@Mock
private DistributionSummary bytesReceivedSummary;

@Mock
private DistributionSummary bytesProcessedSummary;


private final String tableName = UUID.randomUUID().toString();
private final String tableArn = "arn:aws:dynamodb:us-west-2:123456789012:table/" + tableName;
Expand All @@ -87,6 +96,8 @@ void setup() {

given(pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT)).willReturn(exportRecordSuccess);
given(pluginMetrics.counter(EXPORT_RECORDS_PROCESSING_ERROR_COUNT)).willReturn(exportRecordErrors);
given(pluginMetrics.summary(BYTES_RECEIVED)).willReturn(bytesReceivedSummary);
given(pluginMetrics.summary(BYTES_PROCESSED)).willReturn(bytesProcessedSummary);

}

Expand Down Expand Up @@ -116,7 +127,8 @@ void test_writeToBuffer() throws Exception {
verify(exportRecordSuccess).increment(anyDouble());

verifyNoInteractions(exportRecordErrors);

verify(bytesReceivedSummary, times(numberOfRecords)).record(anyDouble());
verify(bytesProcessedSummary, times(numberOfRecords)).record(anyDouble());
}

@Test
Expand Down Expand Up @@ -149,5 +161,7 @@ void test_writeSingleRecordToBuffer() throws Exception {
assertThat(event.getMetadata().getAttribute(EVENT_VERSION_FROM_TIMESTAMP), equalTo(0L));
assertThat(event.getEventHandle(), notNullValue());
assertThat(event.getEventHandle().getExternalOriginationTime(), nullValue());
verify(bytesReceivedSummary, times(1)).record(line.getBytes().length);
verify(bytesProcessedSummary, times(1)).record(line.getBytes().length);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.source.dynamodb.converter;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -48,11 +49,15 @@
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.SORT_KEY_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.BYTES_PROCESSED;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.BYTES_RECEIVED;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.CHANGE_EVENTS_PROCESSED_COUNT;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.CHANGE_EVENTS_PROCESSING_ERROR_COUNT;

@ExtendWith(MockitoExtension.class)
class StreamRecordConverterTest {
private static final Random RANDOM = new Random();

@Mock
private PluginMetrics pluginMetrics;

Expand All @@ -67,6 +72,12 @@ class StreamRecordConverterTest {
@Mock
private Counter changeEventErrorCounter;

@Mock
private DistributionSummary bytesReceivedSummary;

@Mock
private DistributionSummary bytesProcessedSummary;


private final String tableName = UUID.randomUUID().toString();
private final String tableArn = "arn:aws:dynamodb:us-west-2:123456789012:table/" + tableName;
Expand All @@ -89,6 +100,8 @@ void setup() {

given(pluginMetrics.counter(CHANGE_EVENTS_PROCESSED_COUNT)).willReturn(changeEventSuccessCounter);
given(pluginMetrics.counter(CHANGE_EVENTS_PROCESSING_ERROR_COUNT)).willReturn(changeEventErrorCounter);
given(pluginMetrics.summary(BYTES_RECEIVED)).willReturn(bytesReceivedSummary);
given(pluginMetrics.summary(BYTES_PROCESSED)).willReturn(bytesProcessedSummary);

}

Expand All @@ -110,6 +123,8 @@ void test_writeToBuffer() throws Exception {
verify(changeEventSuccessCounter).increment(anyDouble());

verifyNoInteractions(changeEventErrorCounter);
verify(bytesReceivedSummary, times(numberOfRecords)).record(anyDouble());
verify(bytesProcessedSummary, times(numberOfRecords)).record(anyDouble());

}

Expand Down Expand Up @@ -141,6 +156,8 @@ void test_writeSingleRecordToBuffer() throws Exception {
assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(record.dynamodb().approximateCreationDateTime().toEpochMilli()));

verifyNoInteractions(changeEventErrorCounter);
verify(bytesReceivedSummary).record(record.dynamodb().sizeBytes());
verify(bytesProcessedSummary).record(record.dynamodb().sizeBytes());
}

@Test
Expand Down Expand Up @@ -230,6 +247,8 @@ void writingToBuffer_with_nth_event_in_that_second_returns_expected_that_timesta
assertThat(fourthEventWithNewerSecond.getEventHandle().getExternalOriginationTime(), equalTo(newerSecond));

verifyNoInteractions(changeEventErrorCounter);
verify(bytesReceivedSummary, times(4)).record(anyDouble());
verify(bytesProcessedSummary, times(4)).record(anyDouble());
}

private List<software.amazon.awssdk.services.dynamodb.model.Record> buildRecords(int count, final Instant creationTime) {
Expand All @@ -246,6 +265,7 @@ private software.amazon.awssdk.services.dynamodb.model.Record buildRecord(final
partitionKeyAttrName, AttributeValue.builder().s(UUID.randomUUID().toString()).build(),
sortKeyAttrName, AttributeValue.builder().s(UUID.randomUUID().toString()).build());
StreamRecord streamRecord = StreamRecord.builder()
.sizeBytes(RANDOM.nextLong())
.newImage(data)
.keys(data)
.sequenceNumber(UUID.randomUUID().toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.source.dynamodb.stream;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -58,6 +59,7 @@

@ExtendWith(MockitoExtension.class)
class ShardConsumerTest {
private static final Random RANDOM = new Random();

@Mock
private EnhancedSourceCoordinator coordinator;
Expand All @@ -79,6 +81,9 @@ class ShardConsumerTest {
@Mock
private Counter testCounter;

@Mock
private DistributionSummary testSummary;


private StreamCheckpointer checkpointer;

Expand Down Expand Up @@ -142,6 +147,7 @@ void setup() throws Exception {
when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenReturn(response);

given(pluginMetrics.counter(anyString())).willReturn(testCounter);
given(pluginMetrics.summary(anyString())).willReturn(testSummary);
}


Expand Down Expand Up @@ -220,6 +226,7 @@ private List<Record> buildRecords(int count) {
sortKeyAttrName, AttributeValue.builder().s(UUID.randomUUID().toString()).build());

StreamRecord streamRecord = StreamRecord.builder()
.sizeBytes(RANDOM.nextLong())
.newImage(data)
.sequenceNumber(UUID.randomUUID().toString())
.approximateCreationDateTime(Instant.now())
Expand Down
Loading