Skip to content

Commit

Permalink
Merge branch 'main' into grok
Browse files Browse the repository at this point in the history
Signed-off-by: Sandesh Kumar <[email protected]>
  • Loading branch information
sandeshkr419 authored Jun 29, 2024
2 parents 520235d + 5c8623f commit bf7e2e8
Show file tree
Hide file tree
Showing 84 changed files with 1,540 additions and 194 deletions.
1 change: 1 addition & 0 deletions .ci/bwcVersions
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ BWC_VERSION:
- "2.14.0"
- "2.14.1"
- "2.15.0"
- "2.15.1"
- "2.16.0"
4 changes: 2 additions & 2 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# 3. Use the command palette to run the CODEOWNERS: Show owners of current file command, which will display all code owners for the current file.

# Default ownership for all repo files
* @anasalkouz @andrross @Bukhtawar @CEHENKLE @dblock @dbwiddis @gbbafna @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @reta @Rishikesh1159 @sachinpkale @saratvemulapalli @shwetathareja @sohami @VachaShah
* @anasalkouz @andrross @ashking94 @Bukhtawar @CEHENKLE @dblock @dbwiddis @gbbafna @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @reta @Rishikesh1159 @sachinpkale @saratvemulapalli @shwetathareja @sohami @VachaShah

/modules/transport-netty4/ @peternied

Expand All @@ -24,4 +24,4 @@

/.github/ @peternied

/MAINTAINERS.md @anasalkouz @andrross @Bukhtawar @CEHENKLE @dblock @dbwiddis @gbbafna @jed326 @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @peternied @reta @Rishikesh1159 @sachinpkale @saratvemulapalli @shwetathareja @sohami @VachaShah
/MAINTAINERS.md @anasalkouz @andrross @ashking94 @Bukhtawar @CEHENKLE @dblock @dbwiddis @gbbafna @jed326 @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @peternied @reta @Rishikesh1159 @sachinpkale @saratvemulapalli @shwetathareja @sohami @VachaShah
1 change: 1 addition & 0 deletions .github/workflows/gradle-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ jobs:
if: success()
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: ./codeCoverage.xml

- name: Create Comment Success
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/))
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))
- [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782))
- Add batching supported processor base type AbstractBatchingProcessor ([#14554](https://github.com/opensearch-project/OpenSearch/pull/14554))
- Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445))
- Add allowlist setting for ingest-common and search-pipeline-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439))

### Dependencies
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))
Expand All @@ -24,11 +26,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `com.gradle.develocity` from 3.17.4 to 3.17.5 ([#14397](https://github.com/opensearch-project/OpenSearch/pull/14397))
- Bump `opentelemetry` from 1.36.0 to 1.39.0 ([#14457](https://github.com/opensearch-project/OpenSearch/pull/14457))
- Bump `azure-identity` from 1.11.4 to 1.13.0, Bump `msal4j` from 1.14.3 to 1.15.1, Bump `msal4j-persistence-extension` from 1.2.0 to 1.3.0 ([#14506](https://github.com/opensearch-project/OpenSearch/pull/14506))
- Bump `com.azure:azure-storage-common` from 12.21.2 to 12.25.1 ([#14517](https://github.com/opensearch-project/OpenSearch/pull/14517))

### Changed
- [Tiered Caching] Move query recomputation logic outside write lock ([#14187](https://github.com/opensearch-project/OpenSearch/pull/14187))
- unsignedLongRangeQuery now returns MatchNoDocsQuery if the lower bounds are greater than the upper bounds ([#14416](https://github.com/opensearch-project/OpenSearch/pull/14416))
- Updated the `indices.query.bool.max_clause_count` setting from being static to dynamically updateable ([#13568](https://github.com/opensearch-project/OpenSearch/pull/13568))
- Make the class CommunityIdProcessor final ([#14448](https://github.com/opensearch-project/OpenSearch/pull/14448))
- Allow @InternalApi annotation on classes not meant to be constructed outside of the OpenSearch core ([#14575](https://github.com/opensearch-project/OpenSearch/pull/14575))
- Add @InternalApi annotation to japicmp exclusions ([#14597](https://github.com/opensearch-project/OpenSearch/pull/14597))

### Deprecated

Expand All @@ -45,6 +51,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix fs info reporting negative available size ([#11573](https://github.com/opensearch-project/OpenSearch/pull/11573))
- Add ListPitInfo::getKeepAlive() getter ([#14495](https://github.com/opensearch-project/OpenSearch/pull/14495))
- Fix FuzzyQuery in keyword field will use IndexOrDocValuesQuery when both of index and doc_value are true ([#14378](https://github.com/opensearch-project/OpenSearch/pull/14378))
- Fix file cache initialization ([#14004](https://github.com/opensearch-project/OpenSearch/pull/14004))
- Refactoring Grok.validatePatternBank by using an iterative approach ([#14206](https://github.com/opensearch-project/OpenSearch/pull/14206))

### Security
Expand Down
1 change: 1 addition & 0 deletions MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This document contains a list of maintainers in this repo. See [opensearch-proje
| Anas Alkouz | [anasalkouz](https://github.com/anasalkouz) | Amazon |
| Andrew Ross | [andrross](https://github.com/andrross) | Amazon |
| Andriy Redko | [reta](https://github.com/reta) | Aiven |
| Ashish Singh | [ashking94](https://github.com/ashking94) | Amazon |
| Bukhtawar Khan | [Bukhtawar](https://github.com/Bukhtawar) | Amazon |
| Charlotte Henkle | [CEHENKLE](https://github.com/CEHENKLE) | Amazon |
| Dan Widdis | [dbwiddis](https://github.com/dbwiddis) | Amazon |
Expand Down
15 changes: 14 additions & 1 deletion TESTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ OpenSearch uses [jUnit](https://junit.org/junit5/) for testing, it also uses ran
- [Miscellaneous](#miscellaneous)
- [Running verification tasks](#running-verification-tasks)
- [Testing the REST layer](#testing-the-rest-layer)
- [Running REST Tests Against An External Cluster](#running-rest-tests-against-an-external-cluster)
- [Debugging REST Tests](#debugging-rest-tests)
- [Testing packaging](#testing-packaging)
- [Testing packaging on Windows](#testing-packaging-on-windows)
- [Testing VMs are disposable](#testing-vms-are-disposable)
Expand Down Expand Up @@ -272,7 +274,18 @@ yamlRestTest’s and javaRestTest’s are easy to identify, since they are found

If in doubt about which command to use, simply run &lt;gradle path&gt;:check

Note that the REST tests, like all the integration tests, can be run against an external cluster by specifying the `tests.cluster` property, which if present needs to contain a comma separated list of nodes to connect to (e.g. localhost:9300).
## Running REST Tests Against An External Cluster

Note that the REST tests, like all the integration tests, can be run against an external cluster by specifying the following properties `tests.cluster`, `tests.rest.cluster`, `tests.clustername`. Use a comma separated list of node properties for the multi-node cluster.

For example :

./gradlew :rest-api-spec:yamlRestTest \
-Dtests.cluster=localhost:9200 -Dtests.rest.cluster=localhost:9200 -Dtests.clustername=opensearch

## Debugging REST Tests

You can launch a local OpenSearch cluster in debug mode following [Launching and debugging from an IDE](#launching-and-debugging-from-an-ide), and run your REST tests against that following [Running REST Tests Against An External Cluster](#running-rest-tests-against-an-external-cluster).

# Testing packaging

Expand Down
2 changes: 1 addition & 1 deletion buildSrc/version.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
opensearch = 3.0.0
lucene = 9.12.0-snapshot-c896995
lucene = 9.12.0-snapshot-847316d

bundled_jdk_vendor = adoptium
bundled_jdk = 21.0.3+9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,4 +473,17 @@ public void testPublicApiWithProtectedInterface() {

assertThat(failure.diagnotics(), not(hasItem(matching(Diagnostic.Kind.ERROR))));
}

/**
* The constructor arguments have relaxed semantics at the moment: those could be not annotated or be annotated as {@link InternalApi}
*/
public void testPublicApiConstructorAnnotatedInternalApi() {
final CompilerResult result = compile("PublicApiConstructorAnnotatedInternalApi.java", "NotAnnotated.java");
assertThat(result, instanceOf(Failure.class));

final Failure failure = (Failure) result;
assertThat(failure.diagnotics(), hasSize(2));

assertThat(failure.diagnotics(), not(hasItem(matching(Diagnostic.Kind.ERROR))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

package org.opensearch.common.annotation.processor;

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.annotation.InternalApi;

@PublicApi(since = "1.0.0")
@InternalApi
public class InternalApiAnnotated {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.annotation.processor;

import org.opensearch.common.annotation.InternalApi;
import org.opensearch.common.annotation.PublicApi;

@PublicApi(since = "1.0.0")
public class PublicApiConstructorAnnotatedInternalApi {
/**
* The constructors have relaxed semantics at the moment: those could be not annotated or be annotated as {@link InternalApi}
*/
@InternalApi
public PublicApiConstructorAnnotatedInternalApi(NotAnnotated arg) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
51ff4940eb1024184bbaa5dae39695d2392c5bab

This file was deleted.

3 changes: 2 additions & 1 deletion libs/core/src/main/java/org/opensearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public class Version implements Comparable<Version>, ToXContentFragment {
public static final Version V_2_14_0 = new Version(2140099, org.apache.lucene.util.Version.LUCENE_9_10_0);
public static final Version V_2_14_1 = new Version(2140199, org.apache.lucene.util.Version.LUCENE_9_10_0);
public static final Version V_2_15_0 = new Version(2150099, org.apache.lucene.util.Version.LUCENE_9_10_0);
public static final Version V_2_16_0 = new Version(2160099, org.apache.lucene.util.Version.LUCENE_9_11_0);
public static final Version V_2_15_1 = new Version(2150199, org.apache.lucene.util.Version.LUCENE_9_10_0);
public static final Version V_2_16_0 = new Version(2160099, org.apache.lucene.util.Version.LUCENE_9_11_1);
public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_12_0);
public static final Version CURRENT = V_3_0_0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.cache.common.tier;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cache.common.policy.TookTimePolicy;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
Expand Down Expand Up @@ -35,9 +37,13 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.ToLongBiFunction;
Expand All @@ -61,6 +67,7 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {

// Used to avoid caching stale entries in lower tiers.
private static final List<RemovalReason> SPILLOVER_REMOVAL_REASONS = List.of(RemovalReason.EVICTED, RemovalReason.CAPACITY);
private static final Logger logger = LogManager.getLogger(TieredSpilloverCache.class);

private final ICache<K, V> diskCache;
private final ICache<K, V> onHeapCache;
Expand All @@ -86,6 +93,12 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
private final Map<ICache<K, V>, TierInfo> caches;
private final List<Predicate<V>> policies;

/**
* This map is used to handle concurrent requests for same key in computeIfAbsent() to ensure we load the value
* only once.
*/
Map<ICacheKey<K>, CompletableFuture<Tuple<ICacheKey<K>, V>>> completableFutureMap = new ConcurrentHashMap<>();

TieredSpilloverCache(Builder<K, V> builder) {
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null");
Expand Down Expand Up @@ -182,18 +195,24 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
// and it only has to be loaded one time, we should report one miss and the rest hits. But, if we do stats in
// getValueFromTieredCache(),
// we will see all misses. Instead, handle stats in computeIfAbsent().
Tuple<V, String> cacheValueTuple = getValueFromTieredCache(false).apply(key);
Tuple<V, String> cacheValueTuple;
CompletableFuture<Tuple<ICacheKey<K>, V>> future = null;
try (ReleasableLock ignore = readLock.acquire()) {
cacheValueTuple = getValueFromTieredCache(false).apply(key);
if (cacheValueTuple == null) {
// Only one of the threads will succeed putting a future into map for the same key.
// Rest will fetch existing future and wait on that to complete.
future = completableFutureMap.putIfAbsent(key, new CompletableFuture<>());
}
}
List<String> heapDimensionValues = statsHolder.getDimensionsWithTierValue(key.dimensions, TIER_DIMENSION_VALUE_ON_HEAP);
List<String> diskDimensionValues = statsHolder.getDimensionsWithTierValue(key.dimensions, TIER_DIMENSION_VALUE_DISK);

if (cacheValueTuple == null) {
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
// This is needed as there can be many requests for the same key at the same time and we only want to load
// the value once.
V value = null;
try (ReleasableLock ignore = writeLock.acquire()) {
value = onHeapCache.computeIfAbsent(key, loader);
}
V value = compute(key, loader, future);
// Handle stats
if (loader.isLoaded()) {
// The value was just computed and added to the cache by this thread. Register a miss for the heap cache, and the disk cache
Expand Down Expand Up @@ -222,6 +241,55 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
return cacheValueTuple.v1();
}

private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader, CompletableFuture<Tuple<ICacheKey<K>, V>> future)
throws Exception {
// Handler to handle results post processing. Takes a tuple<key, value> or exception as an input and returns
// the value. Also before returning value, puts the value in cache.
BiFunction<Tuple<ICacheKey<K>, V>, Throwable, Void> handler = (pair, ex) -> {
if (pair != null) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(pair.v1(), pair.v2());
} catch (Exception e) {
// TODO: Catch specific exceptions to know whether this resulted from cache or underlying removal
// listeners/stats. Needs better exception handling at underlying layers.For now swallowing
// exception.
logger.warn("Exception occurred while putting item onto heap cache", e);
}
} else {
if (ex != null) {
logger.warn("Exception occurred while trying to compute the value", ex);
}
}
completableFutureMap.remove(key);// Remove key from map as not needed anymore.
return null;
};
V value = null;
if (future == null) {
future = completableFutureMap.get(key);
future.handle(handler);
try {
value = loader.load(key);
} catch (Exception ex) {
future.completeExceptionally(ex);
throw new ExecutionException(ex);
}
if (value == null) {
NullPointerException npe = new NullPointerException("Loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Tuple<>(key, value));
}
} else {
try {
value = future.get().v2();
} catch (InterruptedException ex) {
throw new IllegalStateException(ex);
}
}
return value;
}

@Override
public void invalidate(ICacheKey<K> key) {
// We are trying to invalidate the key from all caches though it would be present in only of them.
Expand Down Expand Up @@ -328,12 +396,22 @@ void handleRemovalFromHeapTier(RemovalNotification<ICacheKey<K>, V> notification
ICacheKey<K> key = notification.getKey();
boolean wasEvicted = SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason());
boolean countEvictionTowardsTotal = false; // Don't count this eviction towards the cache's total if it ends up in the disk tier
if (caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue())) {
boolean exceptionOccurredOnDiskCachePut = false;
boolean canCacheOnDisk = caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue());
if (canCacheOnDisk) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(key, notification.getValue()); // spill over to the disk tier and increment its stats
} catch (Exception ex) {
// TODO: Catch specific exceptions. Needs better exception handling. We are just swallowing exception
// in this case as it shouldn't cause upstream request to fail.
logger.warn("Exception occurred while putting item to disk cache", ex);
exceptionOccurredOnDiskCachePut = true;
}
updateStatsOnPut(TIER_DIMENSION_VALUE_DISK, key, notification.getValue());
} else {
if (!exceptionOccurredOnDiskCachePut) {
updateStatsOnPut(TIER_DIMENSION_VALUE_DISK, key, notification.getValue());
}
}
if (!canCacheOnDisk || exceptionOccurredOnDiskCachePut) {
// If the value is not going to the disk cache, send this notification to the TSC's removal listener
// as the value is leaving the TSC entirely
removalListener.onRemoval(notification);
Expand Down
Loading

0 comments on commit bf7e2e8

Please sign in to comment.