From 377671cf368e2eaaa1cd58ab894cc985aa411100 Mon Sep 17 00:00:00 2001 From: Roman Kvasnytskyi Date: Sat, 4 Nov 2023 21:24:12 +0100 Subject: [PATCH] Replace Guava Cache with Caffeine for OpenSearch integration (#3586) Signed-off-by: Roman Kvasnytskyi --- data-prepper-plugins/opensearch/build.gradle | 2 +- .../opensearch/index/DynamicIndexManager.java | 14 +++++------ .../otel-trace-raw-processor/build.gradle | 2 +- .../oteltrace/OTelTraceRawProcessor.java | 25 ++++++++----------- 4 files changed, 19 insertions(+), 24 deletions(-) diff --git a/data-prepper-plugins/opensearch/build.gradle b/data-prepper-plugins/opensearch/build.gradle index e74771543b..17c632a359 100644 --- a/data-prepper-plugins/opensearch/build.gradle +++ b/data-prepper-plugins/opensearch/build.gradle @@ -15,7 +15,6 @@ dependencies { implementation libs.opensearch.java implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' - implementation libs.guava.core implementation 'javax.ws.rs:javax.ws.rs-api:2.1.1' implementation 'software.amazon.awssdk:auth' implementation 'software.amazon.awssdk:http-client-spi' @@ -30,6 +29,7 @@ dependencies { implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:opensearchserverless' implementation libs.commons.lang3 + implementation 'com.github.ben-manes.caffeine:caffeine:3.1.8' implementation 'software.amazon.awssdk:apache-client' testImplementation testLibs.junit.vintage testImplementation libs.commons.io diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java index a2e613a13b..4bcc58c924 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java @@ -5,21 +5,20 @@ package org.opensearch.dataprepper.plugins.sink.opensearch.index; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import java.io.IOException; import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkNotNull; public class DynamicIndexManager extends AbstractIndexManager { - private Cache indexManagerCache; - final int CACHE_EXPIRE_AFTER_ACCESS_TIME_MINUTES = 30; - final int APPROXIMATE_INDEX_MANAGER_SIZE = 32; + private final Cache indexManagerCache; + private static final int CACHE_EXPIRE_AFTER_ACCESS_TIME_MINUTES = 30; + private static final int APPROXIMATE_INDEX_MANAGER_SIZE = 32; private final long cacheSizeInKB = 1024; protected RestHighLevelClient restHighLevelClient; protected OpenSearchClient openSearchClient; @@ -47,9 +46,8 @@ public DynamicIndexManager(final IndexType indexType, this.restHighLevelClient = restHighLevelClient; this.openSearchSinkConfiguration = openSearchSinkConfiguration; this.clusterSettingsParser = clusterSettingsParser; - CacheBuilder cacheBuilder = CacheBuilder.newBuilder() + Caffeine cacheBuilder = Caffeine.newBuilder() .recordStats() - .concurrencyLevel(1) .maximumWeight(cacheSizeInKB) .expireAfterAccess(CACHE_EXPIRE_AFTER_ACCESS_TIME_MINUTES, TimeUnit.MINUTES) .weigher((k, v) -> APPROXIMATE_INDEX_MANAGER_SIZE); diff --git a/data-prepper-plugins/otel-trace-raw-processor/build.gradle b/data-prepper-plugins/otel-trace-raw-processor/build.gradle index 72e9f05f85..05ef3b5f4b 100644 --- a/data-prepper-plugins/otel-trace-raw-processor/build.gradle +++ b/data-prepper-plugins/otel-trace-raw-processor/build.gradle @@ -18,7 +18,7 @@ dependencies { implementation libs.armeria.grpc implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' - implementation libs.guava.core + implementation 'com.github.ben-manes.caffeine:caffeine:3.1.8' testImplementation testLibs.junit.vintage testImplementation 'org.assertj:assertj-core:3.24.2' testImplementation testLibs.mockito.inline diff --git a/data-prepper-plugins/otel-trace-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltrace/OTelTraceRawProcessor.java b/data-prepper-plugins/otel-trace-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltrace/OTelTraceRawProcessor.java index 7d89ab2d05..a156590107 100644 --- a/data-prepper-plugins/otel-trace-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltrace/OTelTraceRawProcessor.java +++ b/data-prepper-plugins/otel-trace-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltrace/OTelTraceRawProcessor.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.processor.oteltrace; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; @@ -14,8 +16,6 @@ import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.trace.Span; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import io.micrometer.core.instrument.util.StringUtils; import org.opensearch.dataprepper.plugins.processor.oteltrace.model.SpanSet; import org.opensearch.dataprepper.plugins.processor.oteltrace.model.TraceGroup; @@ -62,14 +62,13 @@ public OTelTraceRawProcessor(final OtelTraceRawProcessorConfig otelTraceRawProce final PluginMetrics pluginMetrics) { super(pluginMetrics); traceFlushInterval = SEC_TO_MILLIS * otelTraceRawProcessorConfig.getTraceFlushIntervalSeconds(); - final int numProcessWorkers = pipelineDescription.getNumberOfProcessWorkers(); - traceIdTraceGroupCache = CacheBuilder.newBuilder() - .concurrencyLevel(numProcessWorkers) - .maximumSize(otelTraceRawProcessorConfig.getTraceGroupCacheMaxSize()) - .expireAfterWrite(otelTraceRawProcessorConfig.getTraceGroupCacheTimeToLive().toMillis(), TimeUnit.MILLISECONDS) - .build(); - - pluginMetrics.gauge(TRACE_GROUP_CACHE_COUNT_METRIC_NAME, traceIdTraceGroupCache, cache -> (double) cache.size()); + traceIdTraceGroupCache = Caffeine.newBuilder() + .maximumSize(otelTraceRawProcessorConfig.getTraceGroupCacheMaxSize()) + .expireAfterWrite(otelTraceRawProcessorConfig.getTraceGroupCacheTimeToLive().toMillis(), TimeUnit.MILLISECONDS) + .build(); + + + pluginMetrics.gauge(TRACE_GROUP_CACHE_COUNT_METRIC_NAME, traceIdTraceGroupCache, cache -> (double) cache.estimatedSize()); pluginMetrics.gauge(SPAN_SET_COUNT_METRIC_NAME, traceIdSpanSetMap, cache -> (double) cache.size()); LOG.info("Configured Trace Raw Processor with a trace flush interval of {} ms.", traceFlushInterval); @@ -109,9 +108,7 @@ private void processSpan(final Span span, final Collection spanSet) { spanSet.addAll(rootSpanAndChildren); } else { final Optional populatedChildSpanOptional = processChildSpan(span); - if (populatedChildSpanOptional.isPresent()) { - spanSet.add(populatedChildSpanOptional.get()); - } + populatedChildSpanOptional.ifPresent(spanSet::add); } } @@ -212,7 +209,7 @@ private List getTracesToFlushByGarbageCollection() { entryIterator.remove(); } } - if (recordsToFlush.size() > 0) { + if (!recordsToFlush.isEmpty()) { LOG.info("Flushing {} records", recordsToFlush.size()); } } finally {