Skip to content

Commit

Permalink
Implemented additional metrics for the S3 source. (opensearch-project…
Browse files Browse the repository at this point in the history
…#2028)

Implemented additional metrics for the S3 source. Resolves opensearch-project#2024

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Nov 22, 2022
1 parent 729c2d4 commit 83e64d8
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 22 deletions.
8 changes: 8 additions & 0 deletions data-prepper-plugins/s3-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ The AWS configuration is the same for both SQS and S3.
### Counters

* `s3ObjectsFailed` - The number of S3 objects that the S3 Source failed to read.
* `s3ObjectsNotFound` - The number of S3 objects that the S3 Source failed to read due to a Not Found error from S3. These are also counted toward `s3ObjectsFailed`.
* `s3ObjectsAccessDenied` - The number of S3 objects that the S3 Source failed to read due to an Access Denied or Forbidden error. These are also counted toward `s3ObjectsFailed`.
* `s3ObjectsSucceeded` - The number of S3 objects that the S3 Source successfully read.
* `sqsMessagesReceived` - The number of SQS messages received from the queue by the S3 Source.
* `sqsMessagesDeleted` - The number of SQS messages deleted from the queue by the S3 Source.
Expand All @@ -99,6 +101,12 @@ The AWS configuration is the same for both SQS and S3.
* `s3ObjectReadTimeElapsed` - Measures the time the S3 Source takes to perform a request to GET an S3 object, parse it, and write Events to the buffer.
* `sqsMessageDelay` - Measures the time from when S3 records an event time for the creation of an object to when it was fully parsed.

### Distribution Summaries

* `s3ObjectSizeBytes` - Measures the size of S3 objects as reported by the S3 `Content-Length`. For compressed objects, this is the compressed size.
* `s3ObjectProcessedBytes` - Measures the bytes processed by the S3 source for a given object. For compressed objects, this is the un-compressed size.
* `s3ObjectsEvents` - Measures the number of events (sometimes called records) produced by an S3 object.

## Developer Guide

The integration tests for this plugin do not run as part of the Data Prepper build.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source;

import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
Expand Down Expand Up @@ -77,8 +78,10 @@ void setUp() {

pluginMetrics = mock(PluginMetrics.class);
final Counter counter = mock(Counter.class);
final DistributionSummary distributionSummary = mock(DistributionSummary.class);
final Timer timer = new NoopTimer(new Meter.Id("test", Tags.empty(), null, null, Meter.Type.TIMER));
when(pluginMetrics.counter(anyString())).thenReturn(counter);
when(pluginMetrics.summary(anyString())).thenReturn(distributionSummary);
when(pluginMetrics.timer(anyString())).thenReturn(timer);

bucketOwnerProvider = b -> Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source;

import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.source.configuration.OnErrorOption;
import org.opensearch.dataprepper.plugins.source.configuration.SqsOptions;
Expand Down Expand Up @@ -61,9 +62,11 @@ void setUp() {

pluginMetrics = mock(PluginMetrics.class);
final Counter sharedCounter = mock(Counter.class);
final DistributionSummary distributionSummary = mock(DistributionSummary.class);
final Timer sqsMessageDelayTimer = mock(Timer.class);

when(pluginMetrics.counter(anyString())).thenReturn(sharedCounter);
when(pluginMetrics.summary(anyString())).thenReturn(distributionSummary);
when(pluginMetrics.timer(anyString())).thenReturn(sqsMessageDelayTimer);

final SqsOptions sqsOptions = mock(SqsOptions.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class BufferAccumulator<T extends Record<?>> {
private final Buffer<T> buffer;
private final int numberOfRecordsToAccumulate;
private final int bufferTimeoutMillis;
private int totalWritten = 0;

private final Collection<T> recordsAccumulated;

Expand Down Expand Up @@ -55,9 +56,20 @@ void flush() throws Exception {
}

private void flushAccumulatedToBuffer() throws Exception {
if (recordsAccumulated.size() > 0) {
final int currentRecordCountAccumulated = recordsAccumulated.size();
if (currentRecordCountAccumulated > 0) {
buffer.writeAll(recordsAccumulated, bufferTimeoutMillis);
recordsAccumulated.clear();
totalWritten += currentRecordCountAccumulated;
}
}

/**
* Gets the total number of records written to the buffer.
*
* @return the total number of records written
*/
public int getTotalWritten() {
return totalWritten;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,27 @@

package org.opensearch.dataprepper.plugins.source;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import org.apache.commons.compress.utils.CountingInputStream;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.codec.Codec;
import org.opensearch.dataprepper.plugins.source.compression.CompressionEngine;
import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.http.HttpStatusCode;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;

import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.function.BiConsumer;
Expand All @@ -34,8 +37,13 @@
class S3ObjectWorker {
private static final Logger LOG = LoggerFactory.getLogger(S3ObjectWorker.class);
static final String S3_OBJECTS_FAILED_METRIC_NAME = "s3ObjectsFailed";
static final String S3_OBJECTS_FAILED_NOT_FOUND_METRIC_NAME = "s3ObjectsNotFound";
static final String S3_OBJECTS_FAILED_NOT_FOUND_ACCESS_DENIED = "s3ObjectsAccessDenied";
static final String S3_OBJECTS_SUCCEEDED_METRIC_NAME = "s3ObjectsSucceeded";
static final String S3_OBJECTS_TIME_ELAPSED_METRIC_NAME = "s3ObjectReadTimeElapsed";
static final String S3_OBJECTS_SIZE = "s3ObjectSizeBytes";
static final String S3_OBJECTS_SIZE_PROCESSED = "s3ObjectProcessedBytes";
static final String S3_OBJECTS_EVENTS = "s3ObjectsEvents";

private final S3Client s3Client;
private final Buffer<Record<Event>> buffer;
Expand All @@ -46,8 +54,13 @@ class S3ObjectWorker {
private final int numberOfRecordsToAccumulate;
private final BiConsumer<Event, S3ObjectReference> eventConsumer;
private final Counter s3ObjectsFailedCounter;
private final Counter s3ObjectsFailedNotFoundCounter;
private final Counter s3ObjectsFailedAccessDeniedCounter;
private final Counter s3ObjectsSucceededCounter;
private final Timer s3ObjectReadTimer;
private final DistributionSummary s3ObjectSizeSummary;
private final DistributionSummary s3ObjectSizeProcessedSummary;
private final DistributionSummary s3ObjectEventsSummary;

public S3ObjectWorker(final S3Client s3Client,
final Buffer<Record<Event>> buffer,
Expand All @@ -68,8 +81,13 @@ public S3ObjectWorker(final S3Client s3Client,
this.eventConsumer = eventConsumer;

s3ObjectsFailedCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_METRIC_NAME);
s3ObjectsFailedNotFoundCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_NOT_FOUND_METRIC_NAME);
s3ObjectsFailedAccessDeniedCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_NOT_FOUND_ACCESS_DENIED);
s3ObjectsSucceededCounter = pluginMetrics.counter(S3_OBJECTS_SUCCEEDED_METRIC_NAME);
s3ObjectReadTimer = pluginMetrics.timer(S3_OBJECTS_TIME_ELAPSED_METRIC_NAME);
s3ObjectSizeSummary = pluginMetrics.summary(S3_OBJECTS_SIZE);
s3ObjectSizeProcessedSummary = pluginMetrics.summary(S3_OBJECTS_SIZE_PROCESSED);
s3ObjectEventsSummary = pluginMetrics.summary(S3_OBJECTS_EVENTS);
}

void parseS3Object(final S3ObjectReference s3ObjectReference) throws IOException {
Expand Down Expand Up @@ -99,8 +117,12 @@ void parseS3Object(final S3ObjectReference s3ObjectReference) throws IOException
}

private void doParseObject(final S3ObjectReference s3ObjectReference, final GetObjectRequest getObjectRequest, final BufferAccumulator<Record<Event>> bufferAccumulator) throws IOException {
final long s3ObjectSize;
final long totalBytesRead;

try (final ResponseInputStream<GetObjectResponse> responseInputStream = s3Client.getObject(getObjectRequest);
final InputStream inputStream = compressionEngine.createInputStream(getObjectRequest.key(), responseInputStream)) {
final CountingInputStream inputStream = new CountingInputStream(compressionEngine.createInputStream(getObjectRequest.key(), responseInputStream))) {
s3ObjectSize = responseInputStream.response().contentLength();
codec.parse(inputStream, record -> {
try {
eventConsumer.accept(record.getData(), s3ObjectReference);
Expand All @@ -109,16 +131,33 @@ private void doParseObject(final S3ObjectReference s3ObjectReference, final GetO
LOG.error("Failed writing S3 objects to buffer.", e);
}
});
} catch (final Exception e) {
LOG.error("Error reading from S3 object: s3ObjectReference={}.", s3ObjectReference, e);
totalBytesRead = inputStream.getBytesRead();
} catch (final Exception ex) {
LOG.error("Error reading from S3 object: s3ObjectReference={}.", s3ObjectReference, ex);
s3ObjectsFailedCounter.increment();
throw e;
if(ex instanceof S3Exception) {
recordS3Exception((S3Exception) ex);
}
throw ex;
}

try {
bufferAccumulator.flush();
} catch (final Exception e) {
LOG.error("Failed writing S3 objects to buffer.", e);
}

s3ObjectSizeSummary.record(s3ObjectSize);
s3ObjectSizeProcessedSummary.record(totalBytesRead);
s3ObjectEventsSummary.record(bufferAccumulator.getTotalWritten());
}

private void recordS3Exception(final S3Exception ex) {
if(ex.statusCode() == HttpStatusCode.NOT_FOUND) {
s3ObjectsFailedNotFoundCounter.increment();
}
else if(ex.statusCode() == HttpStatusCode.FORBIDDEN) {
s3ObjectsFailedAccessDeniedCounter.increment();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,66 @@ void flush_after_add_writes_to_buffer() throws Exception {
assertThat(actualRecordsWritten, equalTo(Collections.singletonList(record)));
}

@Test
void getTotalWritten_returns_zero_if_no_writes() throws Exception {
assertThat(createObjectUnderTest().getTotalWritten(), equalTo(0));
}

@Test
void getTotalWritten_returns_accumulated_after_single_write() throws Exception {
final BufferAccumulator objectUnderTest = createObjectUnderTest();
final Record record = createRecord();
objectUnderTest.add(record);

objectUnderTest.flush();

assertThat(objectUnderTest.getTotalWritten(), equalTo(1));
}

@ParameterizedTest
@ValueSource(ints = {1, 2, 10, 20})
void getTotalWritten_returns_accumulated_after_single_write(final int recordsInWrite) throws Exception {
recordsToAccumulate = recordsInWrite;
final BufferAccumulator objectUnderTest = createObjectUnderTest();

for (int i = 0; i < recordsInWrite; i++) {
objectUnderTest.add(createRecord());
}

assertThat(objectUnderTest.getTotalWritten(), equalTo(recordsInWrite));
}

@ParameterizedTest
@ValueSource(ints = {2, 10, 20})
void getTotalWritten_returns_accumulated_after_multiple_writes(final int recordsInWrite) throws Exception {
recordsToAccumulate = 10;
final BufferAccumulator objectUnderTest = createObjectUnderTest();

objectUnderTest.flush();

for (int writes = 0; writes < recordsInWrite; writes++) {
for (int r = 0; r < recordsToAccumulate; r++) {
objectUnderTest.add(createRecord());
}
}

assertThat(objectUnderTest.getTotalWritten(), equalTo(recordsInWrite * recordsToAccumulate));
}

@ParameterizedTest
@ValueSource(ints = {1, 2, 10, 15})
void getTotalWritten_returns_flushed_data(final int accumulationCount) throws Exception {
final BufferAccumulator objectUnderTest = createObjectUnderTest();

for (int i = 0; i < accumulationCount; i++) {
objectUnderTest.add(createRecord());
}

objectUnderTest.flush();

assertThat(objectUnderTest.getTotalWritten(), equalTo(accumulationCount));
}

private Record createRecord() {
return mock(Record.class);
}
Expand Down
Loading

0 comments on commit 83e64d8

Please sign in to comment.