Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into remote-detector-calls
Browse files Browse the repository at this point in the history
  • Loading branch information
engechas authored Mar 5, 2024
2 parents c8124b8 + afc85e0 commit 1f2503d
Show file tree
Hide file tree
Showing 320 changed files with 10,872 additions and 7,665 deletions.
27 changes: 25 additions & 2 deletions .github/workflows/kafka-plugin-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ on:
paths:
- 'data-prepper-plugins/kafka-plugins/**'
- '*gradle*'
pull_request:
pull_request_target:
types: [ opened, synchronize, reopened ]
paths:
- 'data-prepper-plugins/kafka-plugins/**'
- '*gradle*'
workflow_dispatch:

permissions:
id-token: write
contents: read

jobs:
integration-tests:
Expand Down Expand Up @@ -41,9 +45,28 @@ jobs:
- name: Wait for Kafka
run: |
./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin --tests KafkaStartIT
- name: Configure AWS credentials
id: aws-credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: ${{ secrets.TEST_IAM_ROLE_ARN }}
aws-region: ${{ secrets.TEST_REGION }}
output-credentials: true
- name: Configure AWS default credentials
run: |
aws configure set default.region ${{ secrets.TEST_REGION }}
aws configure set default.aws_access_key_id ${{ steps.aws-credentials.outputs.aws-access-key-id }}
aws configure set default.aws_secret_access_key ${{ steps.aws-credentials.outputs.aws-secret-access-key }}
aws configure set default.aws_session_token ${{ steps.aws-credentials.outputs.aws-session-token }}
- name: Run Kafka integration tests
run: |
./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin --tests KafkaSourceJsonTypeIT --tests KafkaBufferIT --tests KafkaBufferOTelIT
./gradlew data-prepper-plugins:kafka-plugins:integrationTest \
-Dtests.kafka.bootstrap_servers=localhost:9092 \
-Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin \
-Dtests.kafka.kms_key=alias/DataPrepperTesting \
--tests '*kafka.buffer*' --tests KafkaSourceJsonTypeIT --tests KafkaBufferOTelIT
- name: Upload Unit Test Results
if: always()
Expand Down
37 changes: 13 additions & 24 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,6 @@
import com.github.jk1.license.render.TextReportRenderer

buildscript {
repositories {
mavenCentral() {
metadataSources {
mavenPom()
ignoreGradleMetadataRedirection()
}
}
maven {
url 'https://plugins.gradle.org/m2/'
metadataSources {
mavenPom()
ignoreGradleMetadataRedirection()
}
}
}
dependencies {
classpath 'com.github.jk1:gradle-license-report:2.1'
}
Expand All @@ -41,14 +26,6 @@ allprojects {
ext {
mavenPublicationRootFile = file("${rootProject.buildDir}/m2")
}

repositories {
mavenCentral()
maven { url 'https://jitpack.io' }
maven {
url 'https://packages.confluent.io/maven/'
}
}

spotless {
format 'markdown', {
Expand Down Expand Up @@ -88,7 +65,7 @@ subprojects {
}
}
dependencies {
implementation platform('com.fasterxml.jackson:jackson-bom:2.15.3')
implementation platform('com.fasterxml.jackson:jackson-bom:2.16.1')
implementation platform('org.eclipse.jetty:jetty-bom:9.4.53.v20231009')
implementation platform('io.micrometer:micrometer-bom:1.10.5')
implementation libs.guava.core
Expand Down Expand Up @@ -194,6 +171,12 @@ subprojects {
}
because 'CVE from transitive dependencies'
}
implementation('com.jayway.jsonpath:json-path') {
version {
require '2.9.0'
}
because 'Fixes CVE-2023-51074 from transitive dependencies'
}
implementation('org.bitbucket.b_c:jose4j') {
version {
require '0.9.3'
Expand All @@ -218,6 +201,12 @@ subprojects {
}
because 'CVE-2023-5072, CVE from transitive dependencies'
}
implementation('org.apache.commons:commons-compress') {
version {
require '1.26.0'
}
because 'CVE-2024-25710, CVE-2024-26308'
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,14 @@ public Map.Entry<Collection<T>, CheckpointState> read(int timeoutInMillis) {
@Override
public void checkpoint(final CheckpointState checkpointState) {
checkpointTimer.record(() -> doCheckpoint(checkpointState));
final int numRecordsToBeChecked = checkpointState.getNumRecordsToBeChecked();
recordsInFlight.addAndGet(-numRecordsToBeChecked);
recordsProcessedCounter.increment(numRecordsToBeChecked);
if (!isByteBuffer()) {
final int numRecordsToBeChecked = checkpointState.getNumRecordsToBeChecked();
recordsInFlight.addAndGet(-numRecordsToBeChecked);
recordsProcessedCounter.increment(numRecordsToBeChecked);
}
}

protected int getRecordsInFlight() {
public int getRecordsInFlight() {
return recordsInFlight.intValue();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@
import java.io.InputStream;
import java.io.Serializable;
import java.util.function.Consumer;
import java.time.Instant;

public interface ByteDecoder extends Serializable {
/**
* Parses an {@link InputStream}. Implementors should call the {@link Consumer} for each
* {@link Record} loaded from the {@link InputStream}.
*
* @param inputStream The input stream for code to process
* @param timeReceived The time received value to be populated in the Record
* @param eventConsumer The consumer which handles each event from the stream
* @throws IOException throws IOException when invalid input is received or incorrect codec name is provided
*/
void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException;
void parse(InputStream inputStream, Instant timeReceived, Consumer<Record<Event>> eventConsumer) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.dataprepper.model.record.Record;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
Expand All @@ -24,33 +25,36 @@ public class JsonDecoder implements ByteDecoder {
private final ObjectMapper objectMapper = new ObjectMapper();
private final JsonFactory jsonFactory = new JsonFactory();

public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
public void parse(InputStream inputStream, Instant timeReceived, Consumer<Record<Event>> eventConsumer) throws IOException {
Objects.requireNonNull(inputStream);
Objects.requireNonNull(eventConsumer);

final JsonParser jsonParser = jsonFactory.createParser(inputStream);

while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) {
if (jsonParser.getCurrentToken() == JsonToken.START_ARRAY) {
parseRecordsArray(jsonParser, eventConsumer);
parseRecordsArray(jsonParser, timeReceived, eventConsumer);
}
}
}

private void parseRecordsArray(final JsonParser jsonParser, final Consumer<Record<Event>> eventConsumer) throws IOException {
private void parseRecordsArray(final JsonParser jsonParser, final Instant timeReceived, final Consumer<Record<Event>> eventConsumer) throws IOException {
while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
final Map<String, Object> innerJson = objectMapper.readValue(jsonParser, Map.class);

final Record<Event> record = createRecord(innerJson);
final Record<Event> record = createRecord(innerJson, timeReceived);
eventConsumer.accept(record);
}
}

private Record<Event> createRecord(final Map<String, Object> json) {
final JacksonEvent event = (JacksonEvent)JacksonLog.builder()
private Record<Event> createRecord(final Map<String, Object> json, final Instant timeReceived) {
final JacksonLog.Builder logBuilder = JacksonLog.builder()
.withData(json)
.getThis()
.build();
.getThis();
if (timeReceived != null) {
logBuilder.withTimeReceived(timeReceived);
}
final JacksonEvent event = (JacksonEvent)logBuilder.build();

return new Record<>(event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ public interface Event extends Serializable {
*/
void delete(String key);

/**
* Delete all keys from the Event
* @since 2.8
*/
void clear();

/**
* Generates a serialized Json string of the entire Event
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -136,10 +137,11 @@ public JsonNode getJsonNode() {
*/
@Override
public void put(final String key, final Object value) {
checkArgument(!key.isEmpty(), "key cannot be an empty string for put method");

final String trimmedKey = checkAndTrimKey(key);

final LinkedList<String> keys = new LinkedList<>(Arrays.asList(trimmedKey.split(SEPARATOR)));
final LinkedList<String> keys = new LinkedList<>(Arrays.asList(trimmedKey.split(SEPARATOR, -1)));

JsonNode parentNode = jsonNode;

Expand Down Expand Up @@ -247,7 +249,12 @@ private <T> List<T> mapNodeToList(final String key, final JsonNode node, final C
}

private JsonPointer toJsonPointer(final String key) {
String jsonPointerExpression = SEPARATOR + key;
final String jsonPointerExpression;
if (key.isEmpty() || key.startsWith("/")) {
jsonPointerExpression = key;
} else {
jsonPointerExpression = SEPARATOR + key;
}
return JsonPointer.compile(jsonPointerExpression);
}

Expand All @@ -259,6 +266,7 @@ private JsonPointer toJsonPointer(final String key) {
@Override
public void delete(final String key) {

checkArgument(!key.isEmpty(), "key cannot be an empty string for delete method");
final String trimmedKey = checkAndTrimKey(key);
final int index = trimmedKey.lastIndexOf(SEPARATOR);

Expand All @@ -276,6 +284,16 @@ public void delete(final String key) {
}
}

@Override
public void clear() {
// Delete all entries from the event
Iterator iter = toMap().keySet().iterator();
JsonNode baseNode = jsonNode;
while (iter.hasNext()) {
((ObjectNode) baseNode).remove((String)iter.next());
}
}

@Override
public String toJsonString() {
return jsonNode.toString();
Expand Down Expand Up @@ -399,24 +417,31 @@ public static boolean isValidEventKey(final String key) {
}
private String checkAndTrimKey(final String key) {
checkKey(key);
return trimKey(key);
return trimTrailingSlashInKey(key);
}

private static void checkKey(final String key) {
checkNotNull(key, "key cannot be null");
checkArgument(!key.isEmpty(), "key cannot be an empty string");
if (key.isEmpty()) {
// Empty string key is valid
return;
}
if (key.length() > MAX_KEY_LENGTH) {
throw new IllegalArgumentException("key cannot be longer than " + MAX_KEY_LENGTH + " characters");
}
if (!isValidKey(key)) {
throw new IllegalArgumentException("key " + key + " must contain only alphanumeric chars with .-_ and must follow JsonPointer (ie. 'field/to/key')");
throw new IllegalArgumentException("key " + key + " must contain only alphanumeric chars with .-_@/ and must follow JsonPointer (ie. 'field/to/key')");
}
}

private String trimKey(final String key) {

final String trimmedLeadingSlash = key.startsWith(SEPARATOR) ? key.substring(1) : key;
return trimmedLeadingSlash.endsWith(SEPARATOR) ? trimmedLeadingSlash.substring(0, trimmedLeadingSlash.length() - 2) : trimmedLeadingSlash;
return trimTrailingSlashInKey(trimmedLeadingSlash);
}

private String trimTrailingSlashInKey(final String key) {
return key.length() > 1 && key.endsWith(SEPARATOR) ? key.substring(0, key.length() - 1) : key;
}

private static boolean isValidKey(final String key) {
Expand All @@ -430,7 +455,9 @@ private static boolean isValidKey(final String key) {
|| c == '-'
|| c == '_'
|| c == '@'
|| c == '/')) {
|| c == '/'
|| c == '['
|| c == ']')) {

return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;

import java.time.Instant;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -161,6 +162,18 @@ public Builder withAttributes(final Map<String, Object> attributes) {
return getThis();
}

/**
* Sets the time received for populating event origination time in event handle
*
* @param timeReceived time received
* @return the builder
* @since 2.7
*/
@Override
public Builder withTimeReceived(final Instant timeReceived) {
return (Builder)super.withTimeReceived(timeReceived);
}

/**
* Sets the observed time of the log event
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.model.event.EventType;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -237,6 +238,18 @@ public JacksonExponentialHistogram.Builder withPositiveOffset(int offset) {
return this;
}

/**
* Sets the time received for populating event origination time in event handle
*
* @param timeReceived time received
* @return the builder
* @since 2.7
*/
@Override
public JacksonExponentialHistogram.Builder withTimeReceived(final Instant timeReceived) {
return (JacksonExponentialHistogram.Builder)super.withTimeReceived(timeReceived);
}

/**
* Sets the offset for the negative buckets
*
Expand Down
Loading

0 comments on commit 1f2503d

Please sign in to comment.