Skip to content

Commit

Permalink
Merge branch 'main' into derived-field-mapper
Browse files Browse the repository at this point in the history
Signed-off-by: Rishabh Maurya <[email protected]>
  • Loading branch information
rishabhmaurya authored Apr 2, 2024
2 parents b33293f + 9ba8b4f commit 97a8be8
Show file tree
Hide file tree
Showing 87 changed files with 2,739 additions and 484 deletions.
27 changes: 27 additions & 0 deletions .github/workflows/detect-breaking-change.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: "Detect Breaking Changes"
on:
pull_request

jobs:
detect-breaking-change:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v4
with:
distribution: temurin # Temurin is a distribution of adoptium
java-version: 21
- uses: gradle/gradle-build-action@v3
with:
cache-disabled: true
arguments: japicmp
gradle-version: 8.7
build-root-directory: server
- if: failure()
run: cat server/build/reports/java-compatibility/report.txt
- if: failure()
uses: actions/upload-artifact@v4
with:
name: java-compatibility-report.html
path: ${{ github.workspace }}/server/build/reports/java-compatibility/report.html

4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Bug] Check phase name before SearchRequestOperationsListener onPhaseStart ([#12035](https://github.com/opensearch-project/OpenSearch/pull/12035))
- Fix Span operation names generated from RestActions ([#12005](https://github.com/opensearch-project/OpenSearch/pull/12005))
- Fix error in RemoteSegmentStoreDirectory when debug logging is enabled ([#12328](https://github.com/opensearch-project/OpenSearch/pull/12328))
- Fix UOE While building Exists query for nested search_as_you_type field ([#12048](https://github.com/opensearch-project/OpenSearch/pull/12048))

### Security

Expand All @@ -111,6 +110,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
- [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954))
- Derived fields support to derive field values at query time without indexing ([#12569](https://github.com/opensearch-project/OpenSearch/pull/12569))
- Detect breaking changes on pull requests ([#9044](https://github.com/opensearch-project/OpenSearch/pull/9044))
- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand All @@ -129,6 +130,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Fixed
- Fix issue with feature flags where default value may not be honored ([#12849](https://github.com/opensearch-project/OpenSearch/pull/12849))
- Fix UOE While building Exists query for nested search_as_you_type field ([#12048](https://github.com/opensearch-project/OpenSearch/pull/12048))

### Security

Expand Down
79 changes: 79 additions & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ plugins {
id('opensearch.publish')
id('opensearch.internal-cluster-test')
id('opensearch.optional-dependencies')
id('me.champeau.gradle.japicmp') version '0.4.2'
}

publishing {
Expand Down Expand Up @@ -378,3 +379,81 @@ tasks.named("sourcesJar").configure {
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
}
}

/** Compares the current build against a snapshot build */
tasks.register("japicmp", me.champeau.gradle.japicmp.JapicmpTask) {
oldClasspath.from(files("${buildDir}/snapshot/opensearch-${version}.jar"))
newClasspath.from(tasks.named('jar'))
onlyModified = true
failOnModification = true
ignoreMissingClasses = true
annotationIncludes = ['@org.opensearch.common.annotation.PublicApi']
txtOutputFile = layout.buildDirectory.file("reports/java-compatibility/report.txt")
htmlOutputFile = layout.buildDirectory.file("reports/java-compatibility/report.html")
dependsOn downloadSnapshot
}

/** If the Java API Comparison task failed, print a hint if the change should be merged from its target branch */
gradle.taskGraph.afterTask { Task task, TaskState state ->
if (task.name == 'japicmp' && state.failure != null) {
def sha = getGitShaFromJar("${buildDir}/snapshot/opensearch-${version}.jar")
logger.info("Incompatiable java api from snapshot jar built off of commit ${sha}")

if (!inHistory(sha)) {
logger.warn('\u001B[33mPlease merge from the target branch and run this task again.\u001B[0m')
}
}
}

/** Downloads latest snapshot from maven repository */
tasks.register("downloadSnapshot", Copy) {
def mavenSnapshotRepoUrl = "https://aws.oss.sonatype.org/content/repositories/snapshots/"
def groupId = "org.opensearch"
def artifactId = "opensearch"

repositories {
maven {
url mavenSnapshotRepoUrl
}
}

configurations {
snapshotArtifact
}

dependencies {
snapshotArtifact("${groupId}:${artifactId}:${version}:")
}

from configurations.snapshotArtifact
into "$buildDir/snapshot"
}

/** Check if the sha is in the current history */
def inHistory(String sha) {
try {
def commandCheckSha = "git merge-base --is-ancestor ${sha} HEAD"
commandCheckSha.execute()
return true
} catch (Exception) {
return false
}
}

/** Extracts the Git SHA used to build a jar from its manifest */
def getGitShaFromJar(String jarPath) {
def sha = ''
try {
// Open the JAR file
def jarFile = new java.util.jar.JarFile(jarPath)
// Get the manifest from the JAR file
def manifest = jarFile.manifest
def attributes = manifest.mainAttributes
// Assuming the Git SHA is stored under an attribute named 'Git-SHA'
sha = attributes.getValue('Change')
jarFile.close()
} catch (IOException e) {
println "Failed to read the JAR file: $e.message"
}
return sha
}
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,8 @@ public static final IndexShard newIndexShard(
nodeId,
null,
DefaultRemoteStoreSettings.INSTANCE,
false
false,
IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import java.util.stream.Collectors;

import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PRIMARY_SHARD_REBALANCE_BUFFER;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -58,6 +61,20 @@ public void enablePreferPrimaryBalance() {
);
}

public void setAllocationRelocationStrategy(boolean preferPrimaryBalance, boolean preferPrimaryRebalance, float buffer) {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder()
.put(PREFER_PRIMARY_SHARD_BALANCE.getKey(), preferPrimaryBalance)
.put(PREFER_PRIMARY_SHARD_REBALANCE.getKey(), preferPrimaryRebalance)
.put(PRIMARY_SHARD_REBALANCE_BUFFER.getKey(), buffer)
)
);
}

/**
* This test verifies that the overall primary balance is attained during allocation. This test verifies primary
* balance per index and across all indices is maintained.
Expand Down Expand Up @@ -87,7 +104,7 @@ public void testGlobalPrimaryAllocation() throws Exception {
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance();
verifyPrimaryBalance(0.0f);
}

/**
Expand Down Expand Up @@ -224,6 +241,70 @@ public void testAllocationWithDisruption() throws Exception {
verifyPerIndexPrimaryBalance();
}

/**
* Similar to testSingleIndexShardAllocation test but creates multiple indices, multiple nodes adding in and getting
* removed. The test asserts post each such event that primary shard distribution is balanced for each index as well as across the nodes
* when the PREFER_PRIMARY_SHARD_REBALANCE is set to true
*/
public void testAllocationAndRebalanceWithDisruption() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final int maxReplicaCount = 2;
final int maxShardCount = 2;
// Create higher number of nodes than number of shards to reduce chances of SameShardAllocationDecider kicking-in
// and preventing primary relocations
final int nodeCount = randomIntBetween(5, 10);
final int numberOfIndices = randomIntBetween(1, 10);
final float buffer = randomIntBetween(1, 4) * 0.10f;

logger.info("--> Creating {} nodes", nodeCount);
final List<String> nodeNames = new ArrayList<>();
for (int i = 0; i < nodeCount; i++) {
nodeNames.add(internalCluster().startNode());
}
setAllocationRelocationStrategy(true, true, buffer);

int shardCount, replicaCount;
ClusterState state;
for (int i = 0; i < numberOfIndices; i++) {
shardCount = randomIntBetween(1, maxShardCount);
replicaCount = randomIntBetween(1, maxReplicaCount);
logger.info("--> Creating index test{} with primary {} and replica {}", i, shardCount, replicaCount);
createIndex("test" + i, shardCount, replicaCount, i % 2 == 0);
ensureGreen(TimeValue.timeValueSeconds(60));
if (logger.isTraceEnabled()) {
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
}
}
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance(buffer);

final int additionalNodeCount = randomIntBetween(1, 5);
logger.info("--> Adding {} nodes", additionalNodeCount);

internalCluster().startNodes(additionalNodeCount);
ensureGreen(TimeValue.timeValueSeconds(60));
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance(buffer);

int nodeCountToStop = additionalNodeCount;
while (nodeCountToStop > 0) {
internalCluster().stopRandomDataNode();
// give replica a chance to promote as primary before terminating node containing the replica
ensureGreen(TimeValue.timeValueSeconds(60));
nodeCountToStop--;
}
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info("--> Cluster state post nodes stop {}", state);
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance(buffer);
}

/**
* Utility method which ensures cluster has balanced primary shard distribution across a single index.
* @throws Exception exception
Expand Down Expand Up @@ -263,7 +344,7 @@ private void verifyPerIndexPrimaryBalance() throws Exception {
}, 60, TimeUnit.SECONDS);
}

private void verifyPrimaryBalance() throws Exception {
private void verifyPrimaryBalance(float buffer) throws Exception {
assertBusy(() -> {
final ClusterState currentState = client().admin().cluster().prepareState().execute().actionGet().getState();
RoutingNodes nodes = currentState.getRoutingNodes();
Expand All @@ -278,7 +359,7 @@ private void verifyPrimaryBalance() throws Exception {
.filter(ShardRouting::primary)
.collect(Collectors.toList())
.size();
assertTrue(primaryCount <= avgPrimaryShardsPerNode);
assertTrue(primaryCount <= (avgPrimaryShardsPerNode * (1 + buffer)));
}
}, 60, TimeUnit.SECONDS);
}
Expand Down
Loading

0 comments on commit 97a8be8

Please sign in to comment.