Skip to content

Commit

Permalink
Run tests on the current JVM for Java 17 & 21 / Gradle 8.8 (opensearc…
Browse files Browse the repository at this point in the history
…h-project#4730)

Run tests on the current JVM rather than always using Java 11 for the tests. This fixes a problem with our current GitHub tests where we are running against only Java 11 even though we want to run against different Java versions (11, 17, 21). Updates the Gradle version to 8.8.

Fix Java 21 support in the AbstractSink by removing usage of Thread::stop which now always throws an UnsupportedOperationException.

Use only microsecond precision time when comparing the times in the event_json codec. These tests are failing now on Java 17 and 21 with precision errors.

Fixed a randomly failing test in BlockingBufferTests where a value 0 caused an IllegalArgumentException.

Logging changes to avoid noise in the Gradle builds in GitHub.

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Jul 12, 2024
1 parent 8fac7cf commit 67f3595
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 50 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ subprojects {

test {
useJUnitPlatform()
javaLauncher = javaToolchains.launcherFor {
languageVersion = JavaLanguageVersion.current()
}
reports {
junitXml.required
html.required
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public abstract class AbstractSink<T extends Record<?>> implements Sink<T> {
private Thread retryThread;
private int maxRetries;
private int waitTimeMs;
private SinkThread sinkThread;

public AbstractSink(final PluginSetting pluginSetting, int numRetries, int waitTimeMs) {
this.pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting);
Expand All @@ -51,7 +52,8 @@ public void initialize() {
// the exceptions which are not retryable.
doInitialize();
if (!isReady() && retryThread == null) {
retryThread = new Thread(new SinkThread(this, maxRetries, waitTimeMs));
sinkThread = new SinkThread(this, maxRetries, waitTimeMs);
retryThread = new Thread(sinkThread);
retryThread.start();
}
}
Expand All @@ -76,7 +78,7 @@ public void output(Collection<T> records) {
@Override
public void shutdown() {
if (retryThread != null) {
retryThread.stop();
sinkThread.stop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class SinkThread implements Runnable {
private int maxRetries;
private int waitTimeMs;

private volatile boolean isStopped = false;

public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) {
this.sink = sink;
this.maxRetries = maxRetries;
Expand All @@ -19,11 +21,15 @@ public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) {
@Override
public void run() {
int numRetries = 0;
while (!sink.isReady() && numRetries++ < maxRetries) {
while (!sink.isReady() && numRetries++ < maxRetries && !isStopped) {
try {
Thread.sleep(waitTimeMs);
sink.doInitialize();
} catch (InterruptedException e){}
}
}

public void stop() {
isStopped = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,10 @@
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.event.EventHandle;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.Arrays;
Expand All @@ -30,6 +25,12 @@
import java.util.UUID;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class AbstractSinkTest {
private int count;
Expand Down Expand Up @@ -71,13 +72,13 @@ void testMetrics() {
}

@Test
void testSinkNotReady() {
void testSinkNotReady() throws InterruptedException {
final String sinkName = "testSink";
final String pipelineName = "pipelineName";
MetricsTestUtil.initMetrics();
PluginSetting pluginSetting = new PluginSetting(sinkName, Collections.emptyMap());
pluginSetting.setPipelineName(pipelineName);
AbstractSink<Record<String>> abstractSink = new AbstractSinkNotReadyImpl(pluginSetting);
AbstractSinkNotReadyImpl abstractSink = new AbstractSinkNotReadyImpl(pluginSetting);
abstractSink.initialize();
assertEquals(abstractSink.isReady(), false);
assertEquals(abstractSink.getRetryThreadState(), Thread.State.RUNNABLE);
Expand All @@ -87,7 +88,10 @@ void testSinkNotReady() {
await().atMost(Duration.ofSeconds(5))
.until(abstractSink::isReady);
assertEquals(abstractSink.getRetryThreadState(), Thread.State.TERMINATED);
int initCountBeforeShutdown = abstractSink.initCount;
abstractSink.shutdown();
Thread.sleep(200);
assertThat(abstractSink.initCount, equalTo(initCountBeforeShutdown));
}

@Test
Expand Down
3 changes: 0 additions & 3 deletions data-prepper-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ dependencies {
exclude group: 'commons-logging', module: 'commons-logging'
}
implementation 'software.amazon.cloudwatchlogs:aws-embedded-metrics:2.0.0-beta-1'
testImplementation 'org.apache.logging.log4j:log4j-jpl:2.23.0'
testImplementation testLibs.spring.test
implementation libs.armeria.core
implementation libs.armeria.grpc
Expand Down Expand Up @@ -89,8 +88,6 @@ task integrationTest(type: Test) {

classpath = sourceSets.integrationTest.runtimeClasspath

systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties'

filter {
includeTestsMatching '*IT'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.UUID;
import java.util.stream.Stream;

Expand Down Expand Up @@ -218,7 +218,7 @@ static class SomeUnknownTypesArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
return Stream.of(
arguments(Random.class),
arguments(Timer.class),
arguments(InputStream.class),
arguments(File.class)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public Stream<? extends Arguments> provideArguments(final ExtensionContext conte
return Stream.of(
Arguments.of(0, randomInt + 1, 0.0),
Arguments.of(1, 100, 1.0),
Arguments.of(randomInt, randomInt, 100.0),
Arguments.of(randomInt + 1, randomInt + 1, 100.0),
Arguments.of(randomInt, randomInt + 250, ((double) randomInt / (randomInt + 250)) * 100),
Arguments.of(6, 9, 66.66666666666666),
Arguments.of(531, 1000, 53.1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;

import org.mockito.Mock;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
Expand All @@ -28,6 +31,7 @@
import java.io.ByteArrayInputStream;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
Expand Down Expand Up @@ -56,7 +60,7 @@ public EventJsonInputCodec createInputCodec() {
@ParameterizedTest
@ValueSource(strings = {"", "{}"})
public void emptyTest(String input) throws Exception {
input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":["+input+"]}";
input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":[" + input + "]}";
ByteArrayInputStream inputStream = new ByteArrayInputStream(input.getBytes());
inputCodec = createInputCodec();
Consumer<Record<Event>> consumer = mock(Consumer.class);
Expand All @@ -70,15 +74,15 @@ public void inCompatibleVersionTest() throws Exception {
final String key = UUID.randomUUID().toString();
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);
Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);

Map<String, Object> dataMap = event.toMap();
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
String input = "{\""+EventJsonDefines.VERSION+"\":\"3.0\", \""+EventJsonDefines.EVENTS+"\":[";
String input = "{\"" + EventJsonDefines.VERSION + "\":\"3.0\", \"" + EventJsonDefines.EVENTS + "\":[";
String comma = "";
for (int i = 0; i < 2; i++) {
input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}";
input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}";
comma = ",";
}
input += "]}";
Expand All @@ -95,24 +99,24 @@ public void basicTest() throws Exception {
final String key = UUID.randomUUID().toString();
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);
Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);

Map<String, Object> dataMap = event.toMap();
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
String input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":[";
String input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":[";
String comma = "";
for (int i = 0; i < 2; i++) {
input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}";
input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}";
comma = ",";
}
input += "]}";
inputStream = new ByteArrayInputStream(input.getBytes());
List<Record<Event>> records = new LinkedList<>();
inputCodec.parse(inputStream, records::add);
assertThat(records.size(), equalTo(2));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -126,24 +130,24 @@ public void test_with_timeReceivedOverridden() throws Exception {
final String key = UUID.randomUUID().toString();
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);
Instant startTime = Instant.now().minusSeconds(5);
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS).minusSeconds(5);
Event event = createEvent(data, startTime);

Map<String, Object> dataMap = event.toMap();
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
String input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":[";
String input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":[";
String comma = "";
for (int i = 0; i < 2; i++) {
input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}";
input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}";
comma = ",";
}
input += "]}";
inputStream = new ByteArrayInputStream(input.getBytes());
List<Record<Event>> records = new LinkedList<>();
inputCodec.parse(inputStream, records::add);
assertThat(records.size(), equalTo(2));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), not(equalTo(startTime)));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -159,7 +163,7 @@ private Event createEvent(final Map<String, Object> json, final Instant timeRece
if (timeReceived != null) {
logBuilder.withTimeReceived(timeReceived);
}
final JacksonEvent event = (JacksonEvent)logBuilder.build();
final JacksonEvent event = (JacksonEvent) logBuilder.build();

return event;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;

import org.mockito.Mock;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

Expand All @@ -22,6 +25,7 @@
import org.opensearch.dataprepper.model.log.JacksonLog;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
Expand Down Expand Up @@ -64,7 +68,7 @@ public void basicTest() throws Exception {
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);

Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);
outputCodec = createOutputCodec();
inputCodec = createInputCodec();
Expand All @@ -75,8 +79,8 @@ public void basicTest() throws Exception {
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);

assertThat(records.size(), equalTo(1));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -90,7 +94,7 @@ public void multipleEventsTest() throws Exception {
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);

Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);
outputCodec = createOutputCodec();
inputCodec = createInputCodec();
Expand All @@ -103,8 +107,8 @@ public void multipleEventsTest() throws Exception {
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);

assertThat(records.size(), equalTo(3));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -122,7 +126,7 @@ public void extendedTest() throws Exception {

Set<String> tags = Set.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
List<String> tagsList = tags.stream().collect(Collectors.toList());
Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);
Instant origTime = startTime.minusSeconds(5);
event.getMetadata().setExternalOriginationTime(origTime);
Expand All @@ -135,11 +139,11 @@ public void extendedTest() throws Exception {
outputCodec.complete(outputStream);
assertThat(outputCodec.getExtension(), equalTo(EventJsonOutputCodec.EVENT_JSON));
List<Record<Event>> records = new LinkedList<>();
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);

assertThat(records.size(), equalTo(1));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags(), equalTo(tags));
Expand All @@ -157,7 +161,7 @@ private Event createEvent(final Map<String, Object> json, final Instant timeRece
if (timeReceived != null) {
logBuilder.withTimeReceived(timeReceived);
}
final JacksonEvent event = (JacksonEvent)logBuilder.build();
final JacksonEvent event = (JacksonEvent) logBuilder.build();

return event;
}
Expand Down
Loading

0 comments on commit 67f3595

Please sign in to comment.