Skip to content

Commit

Permalink
Replace Guava Cache with Caffeine for OpenSearch integration (opensea…
Browse files Browse the repository at this point in the history
…rch-project#3586)

Signed-off-by: Roman Kvasnytskyi <[email protected]>
  • Loading branch information
Periecle authored Nov 4, 2023
1 parent 6681e75 commit 377671c
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 24 deletions.
2 changes: 1 addition & 1 deletion data-prepper-plugins/opensearch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ dependencies {
implementation libs.opensearch.java
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation libs.guava.core
implementation 'javax.ws.rs:javax.ws.rs-api:2.1.1'
implementation 'software.amazon.awssdk:auth'
implementation 'software.amazon.awssdk:http-client-spi'
Expand All @@ -30,6 +29,7 @@ dependencies {
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:opensearchserverless'
implementation libs.commons.lang3
implementation 'com.github.ben-manes.caffeine:caffeine:3.1.8'
implementation 'software.amazon.awssdk:apache-client'
testImplementation testLibs.junit.vintage
testImplementation libs.commons.io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,20 @@

package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkNotNull;

public class DynamicIndexManager extends AbstractIndexManager {
private Cache<String, IndexManager> indexManagerCache;
final int CACHE_EXPIRE_AFTER_ACCESS_TIME_MINUTES = 30;
final int APPROXIMATE_INDEX_MANAGER_SIZE = 32;
private final Cache<String, IndexManager> indexManagerCache;
private static final int CACHE_EXPIRE_AFTER_ACCESS_TIME_MINUTES = 30;
private static final int APPROXIMATE_INDEX_MANAGER_SIZE = 32;
private final long cacheSizeInKB = 1024;
protected RestHighLevelClient restHighLevelClient;
protected OpenSearchClient openSearchClient;
Expand Down Expand Up @@ -47,9 +46,8 @@ public DynamicIndexManager(final IndexType indexType,
this.restHighLevelClient = restHighLevelClient;
this.openSearchSinkConfiguration = openSearchSinkConfiguration;
this.clusterSettingsParser = clusterSettingsParser;
CacheBuilder<String, IndexManager> cacheBuilder = CacheBuilder.newBuilder()
Caffeine<String, IndexManager> cacheBuilder = Caffeine.newBuilder()
.recordStats()
.concurrencyLevel(1)
.maximumWeight(cacheSizeInKB)
.expireAfterAccess(CACHE_EXPIRE_AFTER_ACCESS_TIME_MINUTES, TimeUnit.MINUTES)
.weigher((k, v) -> APPROXIMATE_INDEX_MANAGER_SIZE);
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-plugins/otel-trace-raw-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ dependencies {
implementation libs.armeria.grpc
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation libs.guava.core
implementation 'com.github.ben-manes.caffeine:caffeine:3.1.8'
testImplementation testLibs.junit.vintage
testImplementation 'org.assertj:assertj-core:3.24.2'
testImplementation testLibs.mockito.inline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.plugins.processor.oteltrace;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
Expand All @@ -14,8 +16,6 @@
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.trace.Span;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.micrometer.core.instrument.util.StringUtils;
import org.opensearch.dataprepper.plugins.processor.oteltrace.model.SpanSet;
import org.opensearch.dataprepper.plugins.processor.oteltrace.model.TraceGroup;
Expand Down Expand Up @@ -62,14 +62,13 @@ public OTelTraceRawProcessor(final OtelTraceRawProcessorConfig otelTraceRawProce
final PluginMetrics pluginMetrics) {
super(pluginMetrics);
traceFlushInterval = SEC_TO_MILLIS * otelTraceRawProcessorConfig.getTraceFlushIntervalSeconds();
final int numProcessWorkers = pipelineDescription.getNumberOfProcessWorkers();
traceIdTraceGroupCache = CacheBuilder.newBuilder()
.concurrencyLevel(numProcessWorkers)
.maximumSize(otelTraceRawProcessorConfig.getTraceGroupCacheMaxSize())
.expireAfterWrite(otelTraceRawProcessorConfig.getTraceGroupCacheTimeToLive().toMillis(), TimeUnit.MILLISECONDS)
.build();

pluginMetrics.gauge(TRACE_GROUP_CACHE_COUNT_METRIC_NAME, traceIdTraceGroupCache, cache -> (double) cache.size());
traceIdTraceGroupCache = Caffeine.newBuilder()
.maximumSize(otelTraceRawProcessorConfig.getTraceGroupCacheMaxSize())
.expireAfterWrite(otelTraceRawProcessorConfig.getTraceGroupCacheTimeToLive().toMillis(), TimeUnit.MILLISECONDS)
.build();


pluginMetrics.gauge(TRACE_GROUP_CACHE_COUNT_METRIC_NAME, traceIdTraceGroupCache, cache -> (double) cache.estimatedSize());
pluginMetrics.gauge(SPAN_SET_COUNT_METRIC_NAME, traceIdSpanSetMap, cache -> (double) cache.size());

LOG.info("Configured Trace Raw Processor with a trace flush interval of {} ms.", traceFlushInterval);
Expand Down Expand Up @@ -109,9 +108,7 @@ private void processSpan(final Span span, final Collection<Span> spanSet) {
spanSet.addAll(rootSpanAndChildren);
} else {
final Optional<Span> populatedChildSpanOptional = processChildSpan(span);
if (populatedChildSpanOptional.isPresent()) {
spanSet.add(populatedChildSpanOptional.get());
}
populatedChildSpanOptional.ifPresent(spanSet::add);
}
}

Expand Down Expand Up @@ -212,7 +209,7 @@ private List<Span> getTracesToFlushByGarbageCollection() {
entryIterator.remove();
}
}
if (recordsToFlush.size() > 0) {
if (!recordsToFlush.isEmpty()) {
LOG.info("Flushing {} records", recordsToFlush.size());
}
} finally {
Expand Down

0 comments on commit 377671c

Please sign in to comment.