Skip to content

Commit

Permalink
Merge branch 'main' into agg-per
Browse files Browse the repository at this point in the history
Signed-off-by: Sandesh Kumar <[email protected]>
  • Loading branch information
sandeshkr419 authored Feb 12, 2024
2 parents 91ccaef + 76ae14a commit 6fef82e
Show file tree
Hide file tree
Showing 33 changed files with 874 additions and 231 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check-compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
name: results.txt

- name: Find Comment
uses: peter-evans/find-comment@v2
uses: peter-evans/find-comment@v3
id: fc
with:
issue-number: ${{ github.event.number }}
Expand Down
170 changes: 6 additions & 164 deletions CHANGELOG.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public static void configureRepositories(Project project) {
String revision = matcher.group(1);
MavenArtifactRepository luceneRepo = repos.maven(repo -> {
repo.setName("lucene-snapshots");
repo.setUrl("https://artifacts.opensearch.org/snapshots/lucene/");
repo.setUrl("https://ci.opensearch.org/ci/dbc/snapshots/lucene/");
});
repos.exclusiveContent(exclusiveRepo -> {
exclusiveRepo.filter(
Expand Down
6 changes: 3 additions & 3 deletions gradle/code-coverage.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ repositories {
gradlePluginPortal()
// TODO: Find the way to use the repositories from RepositoriesSetupPlugin
maven {
url = "https://artifacts.opensearch.org/snapshots/lucene/"
url = "https://ci.opensearch.org/ci/dbc/snapshots/lucene/"
}
}

Expand All @@ -37,14 +37,14 @@ tasks.withType(JacocoReport).configureEach {
if (System.getProperty("tests.coverage")) {
reporting {
reports {
testCodeCoverageReport(JacocoCoverageReport) {
testCodeCoverageReport(JacocoCoverageReport) {
testType = TestSuiteType.UNIT_TEST
}
}
}

// Attach code coverage report task to Gradle check task
project.getTasks().named(JavaBasePlugin.CHECK_TASK_NAME).configure {
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
}
}
2 changes: 1 addition & 1 deletion plugins/repository-gcs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ dependencies {

api 'com.google.api-client:google-api-client:2.2.0'

api 'com.google.api.grpc:proto-google-common-protos:2.25.1'
api 'com.google.api.grpc:proto-google-common-protos:2.33.0'
api 'com.google.api.grpc:proto-google-iam-v1:0.12.0'

api "com.google.auth:google-auth-library-credentials:${versions.google_auth}"
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
644e11df1cec6d38a63a9a06a701e48c398b87d0
176 changes: 176 additions & 0 deletions release-notes/opensearch.release-notes-2.12.0.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
import org.opensearch.plugins.Plugin;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -715,4 +717,144 @@ public void testClusterHealthResponseWithEnsureNodeWeighedInParam() throws Excep
assertFalse(nodeLocalHealth.isTimedOut());
assertTrue(nodeLocalHealth.hasDiscoveredClusterManager());
}

public void testReadWriteWeightedRoutingMetadataOnNodeRestart() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
1,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
1,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
1,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get();
assertTrue(deleteResponse.isAcknowledged());

// check weighted routing metadata after node restart, ensure node comes healthy after restart
internalCluster().restartNode(nodes_in_zone_a.get(0), new InternalTestCluster.RestartCallback());
ensureGreen();
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// make sure restarted node joins the cluster
assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size());
assertNotNull(
internalCluster().client(nodes_in_zone_a.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_b.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_c.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(internalCluster().getClusterManagerName())
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);

internalCluster().restartNode(internalCluster().getClusterManagerName(), new InternalTestCluster.RestartCallback());
ensureGreen();
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// make sure restarted node joins the cluster
assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size());
assertNotNull(
internalCluster().client(nodes_in_zone_a.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_b.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_c.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(internalCluster().getClusterManagerName())
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationShardStats;
Expand Down Expand Up @@ -175,17 +174,6 @@ private IndexShard getIndexShard(ClusterState state, ShardRouting routing, Strin
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), routing.shardId(), indexName);
}

/**
* Fetch IndexShard by shardId, multiple shards per node allowed.
*/
protected IndexShard getIndexShard(String node, ShardId shardId, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> id = indexService.shardIds().stream().filter(sid -> sid == shardId.id()).findFirst();
return indexService.getShard(id.get());
}

/**
* Fetch IndexShard, assumes only a single shard per node.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
Expand All @@ -130,7 +131,8 @@ public IndexStatsIT(Settings settings) {
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build() }
);
}

Expand Down Expand Up @@ -175,7 +177,7 @@ public void testFieldDataStats() throws InterruptedException {
ensureGreen();
client().prepareIndex("test").setId("1").setSource("field", "value1", "field2", "value1").execute().actionGet();
client().prepareIndex("test").setId("2").setSource("field", "value2", "field2", "value2").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
refreshAndWaitForReplication();
indexRandomForConcurrentSearch("test");

NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet();
Expand Down Expand Up @@ -299,7 +301,7 @@ public void testClearAllCaches() throws Exception {
client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client().prepareIndex("test").setId("1").setSource("field", "value1").execute().actionGet();
client().prepareIndex("test").setId("2").setSource("field", "value2").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
refreshAndWaitForReplication();
indexRandomForConcurrentSearch("test");

NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet();
Expand Down Expand Up @@ -667,7 +669,7 @@ public void testSimpleStats() throws Exception {
client().prepareIndex("test1").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test1").setId(Integer.toString(2)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test2").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();
refresh();
refreshAndWaitForReplication();

NumShards test1 = getNumShards("test1");
long test1ExpectedWrites = 2 * test1.dataCopies;
Expand All @@ -682,7 +684,13 @@ public void testSimpleStats() throws Exception {
assertThat(stats.getPrimaries().getIndexing().getTotal().getIndexFailedCount(), equalTo(0L));
assertThat(stats.getPrimaries().getIndexing().getTotal().isThrottled(), equalTo(false));
assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis(), equalTo(0L));
assertThat(stats.getTotal().getIndexing().getTotal().getIndexCount(), equalTo(totalExpectedWrites));

// This assert should not be done on segrep enabled indices because we are asserting Indexing/Write operations count on
// all primary and replica shards. But in case of segrep, Indexing/Write operation don't happen on replica shards. So we can
// ignore this assert check for segrep enabled indices.
if (isSegmentReplicationEnabledForIndex("test1") == false && isSegmentReplicationEnabledForIndex("test2") == false) {
assertThat(stats.getTotal().getIndexing().getTotal().getIndexCount(), equalTo(totalExpectedWrites));
}
assertThat(stats.getTotal().getStore(), notNullValue());
assertThat(stats.getTotal().getMerge(), notNullValue());
assertThat(stats.getTotal().getFlush(), notNullValue());
Expand Down Expand Up @@ -825,6 +833,7 @@ public void testMergeStats() {
client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet();
stats = client().admin().indices().prepareStats().setMerge(true).execute().actionGet();

refreshAndWaitForReplication();
assertThat(stats.getTotal().getMerge(), notNullValue());
assertThat(stats.getTotal().getMerge().getTotal(), greaterThan(0L));
}
Expand All @@ -851,7 +860,7 @@ public void testSegmentsStats() {

client().admin().indices().prepareFlush().get();
client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet();
client().admin().indices().prepareRefresh().get();
refreshAndWaitForReplication();
stats = client().admin().indices().prepareStats().setSegments(true).get();

assertThat(stats.getTotal().getSegments(), notNullValue());
Expand All @@ -869,7 +878,7 @@ public void testAllFlags() throws Exception {
client().prepareIndex("test_index").setId(Integer.toString(2)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test_index_2").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();

client().admin().indices().prepareRefresh().execute().actionGet();
refreshAndWaitForReplication();
IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats();
Flag[] values = CommonStatsFlags.Flag.values();
for (Flag flag : values) {
Expand Down Expand Up @@ -1453,6 +1462,7 @@ public void testZeroRemoteStoreStatsOnNonRemoteStoreIndex() {
.get()
.status()
);
refreshAndWaitForReplication();
ShardStats shard = client().admin().indices().prepareStats(indexName).setSegments(true).setTranslog(true).get().getShards()[0];
RemoteSegmentStats remoteSegmentStatsFromIndexStats = shard.getStats().getSegments().getRemoteSegmentStats();
assertZeroRemoteSegmentStats(remoteSegmentStatsFromIndexStats);
Expand Down
Loading

0 comments on commit 6fef82e

Please sign in to comment.