Skip to content

Commit

Permalink
Maintenance: remove support for non event data model (opensearch-proj…
Browse files Browse the repository at this point in the history
…ect#1689)

* MAINT: remove support for OTLP data transport

Signed-off-by: Qi Chen <[email protected]>
  • Loading branch information
chenqi0805 authored Sep 2, 2022
1 parent 23911dd commit 7bc0d67
Show file tree
Hide file tree
Showing 82 changed files with 157 additions and 5,470 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,5 @@ jobs:
java-version: 11
- name: Checkout Data-Prepper
uses: actions/checkout@v2
# TODO: Event record type only in 2.0 (https://github.com/opensearch-project/data-prepper/issues/1272)
- name: Run raw-span OTLP record type latest release compatibility end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:trace:rawSpanOTLPLatestReleaseCompatibilityEndToEndTest
- name: Run raw-span Event record type latest release compatibility end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:trace:rawSpanEventLatestReleaseCompatibilityEndToEndTest
- name: Run raw-span latest release compatibility end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:trace:rawSpanLatestReleaseCompatibilityEndToEndTest
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,5 @@ jobs:
java-version: 11
- name: Checkout Data-Prepper
uses: actions/checkout@v2
# TODO: Event record type only in 2.0 (https://github.com/opensearch-project/data-prepper/issues/1272)
- name: Run raw-span OTLP record type end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:trace:rawSpanOTLPEndToEndTest
- name: Run raw-span OTLP and Event end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:trace:rawSpanOTLPAndEventEndToEndTest
- name: Run raw-span end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:trace:rawSpanEndToEndTest
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,5 @@ jobs:
java-version: 11
- name: Checkout Data-Prepper
uses: actions/checkout@v2
# TODO: Event record type only in 2.0 (https://github.com/opensearch-project/data-prepper/issues/1272)
- name: Run service-map OTLP record type end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:trace:serviceMapOTLPEndToEndTest
- name: Run service-map OTLP and Event end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:trace:serviceMapOTLPAndEventEndToEndTest
- name: Run service-map end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:trace:serviceMapEndToEndTest
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.RestClient;
Expand All @@ -60,9 +55,7 @@
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.amazon.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createContentParser;
import static com.amazon.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createOpenSearchClient;
Expand Down Expand Up @@ -158,16 +151,15 @@ public void testInstantiateSinkRawSpanReservedAliasAlreadyUsedAsIndex() throws I
RuntimeException.class, () -> new OpenSearchSink(pluginSetting));
}

@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
public void testOutputRawSpanDefault(final Function<String, Record> stringToRecord) throws IOException, InterruptedException {
@Test
public void testOutputRawSpanDefault() throws IOException, InterruptedException {
final String testDoc1 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1);
final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_2);
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked") final Map<String, Object> expData1 = mapper.readValue(testDoc1, Map.class);
@SuppressWarnings("unchecked") final Map<String, Object> expData2 = mapper.readValue(testDoc2, Map.class);

final List<Record<Object>> testRecords = Arrays.asList(stringToRecord.apply(testDoc1), stringToRecord.apply(testDoc2));
final List<Record<Event>> testRecords = Arrays.asList(jsonStringToRecord(testDoc1), jsonStringToRecord(testDoc2));
final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null);
final OpenSearchSink sink = new OpenSearchSink(pluginSetting);
sink.output(testRecords);
Expand Down Expand Up @@ -223,15 +215,15 @@ public void testOutputRawSpanDefault(final Function<String, Record> stringToReco
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2058.0, 0));
}

@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
public void testOutputRawSpanWithDLQ(final Function<String, Record> stringToRecord) throws IOException, InterruptedException {
@Test
public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException {
// TODO: write test case
final String testDoc1 = readDocFromFile("raw-span-error.json");
final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1);
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked") final Map<String, Object> expData = mapper.readValue(testDoc2, Map.class);
final List<Record<Object>> testRecords = Arrays.asList(stringToRecord.apply(testDoc1), stringToRecord.apply(testDoc2));

final List<Record<Event>> testRecords = Arrays.asList(jsonStringToRecord(testDoc1), jsonStringToRecord(testDoc2));
final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null);
// generate temporary directory for dlq file
final File tempDirectory = Files.createTempDirectory("").toFile();
Expand Down Expand Up @@ -299,14 +291,13 @@ public void testInstantiateSinkServiceMapDefault() throws IOException {
}
}

@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
public void testOutputServiceMapDefault(final Function<String, Record> stringToRecord) throws IOException, InterruptedException {
@Test
public void testOutputServiceMapDefault() throws IOException, InterruptedException {
final String testDoc = readDocFromFile(DEFAULT_SERVICE_MAP_FILE);
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked") final Map<String, Object> expData = mapper.readValue(testDoc, Map.class);

final List<Record<Object>> testRecords = Collections.singletonList(stringToRecord.apply(testDoc));
final List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(testDoc));
final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null);
OpenSearchSink sink = new OpenSearchSink(pluginSetting);
sink.output(testRecords);
Expand Down Expand Up @@ -463,15 +454,14 @@ public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates() throws IOEx

}

@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
public void testOutputCustomIndex(final Function<String, Record> stringToRecord) throws IOException, InterruptedException {
@Test
public void testOutputCustomIndex() throws IOException, InterruptedException {
final String testIndexAlias = "test-alias";
final String testTemplateFile = Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile();
final String testIdField = "someId";
final String testId = "foo";
final List<Record<Object>> testRecords = Collections.singletonList(stringToRecord.apply(generateCustomRecordJson(testIdField, testId)));
final List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId)));
final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile);
pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
final OpenSearchSink sink = new OpenSearchSink(pluginSetting);
Expand All @@ -490,15 +480,14 @@ public void testOutputCustomIndex(final Function<String, Record> stringToRecord)
Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0);
}

@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
public void testBulkActionCreate(final Function<String, Record> stringToRecord) throws IOException, InterruptedException {
@Test
public void testBulkActionCreate() throws IOException, InterruptedException {
final String testIndexAlias = "test-alias";
final String testTemplateFile = Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile();
final String testIdField = "someId";
final String testId = "foo";
final List<Record<Object>> testRecords = Collections.singletonList(stringToRecord.apply(generateCustomRecordJson(testIdField, testId)));
final List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId)));
final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile);
pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
pluginSetting.getSettings().put(IndexConfiguration.ACTION, BulkAction.CREATE.toString());
Expand Down Expand Up @@ -526,7 +515,7 @@ public void testEventOutput() throws IOException, InterruptedException {
.withEventType("event")
.build();

final List<Record<Object>> testRecords = Collections.singletonList(new Record<>(testEvent));
final List<Record<Event>> testRecords = Collections.singletonList(new Record<>(testEvent));

final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null);
final OpenSearchSink sink = new OpenSearchSink(pluginSetting);
Expand All @@ -543,10 +532,9 @@ public void testEventOutput() throws IOException, InterruptedException {
sink.shutdown();
}

@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
@Test
@Timeout(value = 1, unit = TimeUnit.MINUTES)
public void testOutputManagementDisabled(final Function<String, Record> stringToRecord) throws IOException, InterruptedException {
public void testOutputManagementDisabled() throws IOException, InterruptedException {
final String testIndexAlias = "test-" + UUID.randomUUID();
final String roleName = UUID.randomUUID().toString();
final String username = UUID.randomUUID().toString();
Expand All @@ -558,7 +546,7 @@ public void testOutputManagementDisabled(final Function<String, Record> stringTo
final String testIdField = "someId";
final String testId = "foo";

final List<Record<Object>> testRecords = Collections.singletonList(stringToRecord.apply(generateCustomRecordJson(testIdField, testId)));
final List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId)));

final Map<String, Object> metadata = initializeConfigurationMetadata(null, testIndexAlias, null);
metadata.put(IndexConfiguration.INDEX_TYPE, IndexType.MANAGEMENT_DISABLED.getValue());
Expand Down Expand Up @@ -722,33 +710,12 @@ private void wipeAllOpenSearchIndices() throws IOException {
});
}

/**
* Provides a function for mapping a String to a Record to allow the tests to run
* against both String and Event models.
*/
static class MultipleRecordTypeArgumentProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
final ObjectMapper objectMapper = new ObjectMapper();
final Function<String, Record> stringModel = jsonString -> {
try {
// Normalize the JSON string.
return new Record(objectMapper.writeValueAsString(objectMapper.readValue(jsonString, Map.class)));
} catch (final JsonProcessingException e) {
throw new RuntimeException(e);
}
};
final Function<String, Record> eventModel = jsonString -> {
try {
return new Record(JacksonEvent.builder().withEventType(EventType.TRACE.toString()).withData(objectMapper.readValue(jsonString, Map.class)).build());
} catch (final JsonProcessingException e) {
throw new RuntimeException(e);
}
};
return Stream.of(
Arguments.of(stringModel),
Arguments.of(eventModel)
);
private Record jsonStringToRecord(final String jsonString) {
final ObjectMapper objectMapper = new ObjectMapper();
try {
return new Record(JacksonEvent.builder().withEventType(EventType.TRACE.toString()).withData(objectMapper.readValue(jsonString, Map.class)).build());
} catch (final JsonProcessingException e) {
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.rest_client.RestClientTransport;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,7 +48,7 @@
import java.util.function.Supplier;

@DataPrepperPlugin(name = "opensearch", pluginType = Sink.class)
public class OpenSearchSink extends AbstractSink<Record<Object>> {
public class OpenSearchSink extends AbstractSink<Record<Event>> {
public static final String BULKREQUEST_LATENCY = "bulkRequestLatency";
public static final String BULKREQUEST_ERRORS = "bulkRequestErrors";
public static final String BULKREQUEST_SIZE_BYTES = "bulkRequestSizeBytes";
Expand Down Expand Up @@ -124,14 +119,14 @@ public void initialize() throws IOException {
}

@Override
public void doOutput(final Collection<Record<Object>> records) {
public void doOutput(final Collection<Record<Event>> records) {
if (records.isEmpty()) {
return;
}

AccumulatingBulkRequest<BulkOperation, BulkRequest> bulkRequest = bulkRequestSupplier.get();

for (final Record<Object> record : records) {
for (final Record<Event> record : records) {
final SerializedJson document = getDocument(record.getData());
final Optional<String> docId = getDocumentIdFromDocument(document);

Expand Down Expand Up @@ -200,20 +195,8 @@ private Optional<String> getDocumentIdFromDocument(final SerializedJson document
return Optional.empty();
}

// Temporary function to support both trace and log ingestion pipelines.
// TODO: This function should be removed with the completion of: https://github.com/opensearch-project/data-prepper/issues/546
private SerializedJson getDocument(final Object object) {
final String jsonString;
if (object instanceof String) {
jsonString = (String) object;
} else if (object instanceof Event) {
jsonString = ((Event) object).toJsonString();

} else {
throw new RuntimeException("Invalid record type. OpenSearch sink only supports String and Events");
}

return SerializedJson.fromString(jsonString);
private SerializedJson getDocument(final Event event) {
return SerializedJson.fromString(event.toJsonString());
}

private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) {
Expand All @@ -230,12 +213,6 @@ private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) {
});
}

private Map<String, Object> getMapFromJson(final String documentJson) throws IOException {
final XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, documentJson);
return parser.map();
}

private void logFailure(final BulkOperation bulkOperation, final Throwable failure) {
if (dlqWriter != null) {
try {
Expand Down
67 changes: 0 additions & 67 deletions data-prepper-plugins/otel-trace-group-prepper/README.md

This file was deleted.

Loading

0 comments on commit 7bc0d67

Please sign in to comment.