Skip to content

Commit

Permalink
Add ListPitInfo::getKeepAlive() getter (opensearch-project#14495)
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta authored and wangdongyu.danny committed Aug 22, 2024
1 parent b296cd1 commit 5ae704f
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fixed rest-high-level client searchTemplate & mtermVectors endpoints to have a leading slash ([#14465](https://github.com/opensearch-project/OpenSearch/pull/14465))
- Write shard level metadata blob when snapshotting searchable snapshot indexes ([#13190](https://github.com/opensearch-project/OpenSearch/pull/13190))
- Fix aggs result of NestedAggregator with sub NestedAggregator ([#13324](https://github.com/opensearch-project/OpenSearch/pull/13324))
- Add ListPitInfo::getKeepAlive() getter ([#14495](https://github.com/opensearch-project/OpenSearch/pull/14495))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1405,7 +1405,7 @@ public void testPitCreatedOnReplica() throws Exception {
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
.setRequestCache(false)
.get();
PitTestsUtil.assertUsingGetAllPits(client(replica), pitResponse.getId(), pitResponse.getCreationTime());
PitTestsUtil.assertUsingGetAllPits(client(replica), pitResponse.getId(), pitResponse.getCreationTime(), TimeValue.timeValueDays(1));
assertSegments(false, INDEX_NAME, 1, client(replica), pitResponse.getId());

List<String> currentFiles = List.of(replicaShard.store().directory().listAll());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void testPit() throws Exception {
assertEquals(2, searchResponse.getSuccessfulShards());
assertEquals(2, searchResponse.getTotalShards());
validatePitStats("index", 2, 2);
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime());
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime(), TimeValue.timeValueDays(1));
assertSegments(false, client(), pitResponse.getId());
}

Expand All @@ -131,7 +131,12 @@ public void testCreatePitWhileNodeDropWithAllowPartialCreationTrue() throws Exce
public Settings onNodeStopped(String nodeName) throws Exception {
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
CreatePitResponse pitResponse = execute.get();
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime());
PitTestsUtil.assertUsingGetAllPits(
client(),
pitResponse.getId(),
pitResponse.getCreationTime(),
TimeValue.timeValueDays(1)
);
assertSegments(false, "index", 1, client(), pitResponse.getId());
assertEquals(1, pitResponse.getSuccessfulShards());
assertEquals(2, pitResponse.getTotalShards());
Expand Down Expand Up @@ -164,7 +169,12 @@ public Settings onNodeStopped(String nodeName) throws Exception {
assertEquals(0, searchResponse.getSkippedShards());
assertEquals(2, searchResponse.getTotalShards());
validatePitStats("index", 1, 1);
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime());
PitTestsUtil.assertUsingGetAllPits(
client(),
pitResponse.getId(),
pitResponse.getCreationTime(),
TimeValue.timeValueDays(1)
);
return super.onNodeStopped(nodeName);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public long getCreationTime() {
return creationTime;
}

public long getKeepAlive() {
return keepAlive;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(pitId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AtomicArray;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.query.IdsQueryBuilder;
Expand Down Expand Up @@ -97,7 +98,8 @@ public static String getPitId() {
return SearchContextId.encode(array.asList(), aliasFilters, version);
}

public static void assertUsingGetAllPits(Client client, String id, long creationTime) throws ExecutionException, InterruptedException {
public static void assertUsingGetAllPits(Client client, String id, long creationTime, TimeValue keepAlive) throws ExecutionException,
InterruptedException {
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.local(false);
clusterStateRequest.clear().nodes(true).routingTable(true).indices("*");
Expand All @@ -113,6 +115,7 @@ public static void assertUsingGetAllPits(Client client, String id, long creation
GetAllPitNodesResponse getPitResponse = execute1.get();
assertTrue(getPitResponse.getPitInfos().get(0).getPitId().contains(id));
Assert.assertEquals(getPitResponse.getPitInfos().get(0).getCreationTime(), creationTime);
Assert.assertEquals(getPitResponse.getPitInfos().get(0).getKeepAlive(), keepAlive.getMillis());
}

public static void assertGetAllPitsEmpty(Client client) throws ExecutionException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testCreatePITSuccess() throws ExecutionException, InterruptedExcepti
request.setIndices(new String[] { "index" });
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
CreatePitResponse pitResponse = execute.get();
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime());
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime(), TimeValue.timeValueDays(1));
assertSegments(false, client(), pitResponse.getId());
client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
SearchResponse searchResponse = client().prepareSearch("index")
Expand Down Expand Up @@ -106,7 +106,7 @@ public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException,

ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
CreatePitResponse response = execute.get();
PitTestsUtil.assertUsingGetAllPits(client(), response.getId(), response.getCreationTime());
PitTestsUtil.assertUsingGetAllPits(client(), response.getId(), response.getCreationTime(), TimeValue.timeValueDays(1));
assertSegments(false, client(), response.getId());
assertEquals(4, response.getSuccessfulShards());
assertEquals(4, service.getActiveContexts());
Expand All @@ -127,7 +127,7 @@ public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, I
request.setIndices(new String[] { "index" });
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
CreatePitResponse pitResponse = execute.get();
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime());
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime(), TimeValue.timeValueDays(1));
assertSegments(false, client(), pitResponse.getId());
client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
SearchResponse searchResponse = client().prepareSearch("index")
Expand Down Expand Up @@ -229,7 +229,7 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx
request.setIndices(new String[] { "index" });
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
CreatePitResponse pitResponse = execute.get();
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime());
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime(), TimeValue.timeValueDays(1));
assertSegments(false, client(), pitResponse.getId());
SearchService service = getInstanceFromNode(SearchService.class);
assertEquals(2, service.getActiveContexts());
Expand Down Expand Up @@ -412,7 +412,7 @@ public void testPitAfterUpdateIndex() throws Exception {
request.setIndices(new String[] { "test" });
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
CreatePitResponse pitResponse = execute.get();
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime());
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime(), TimeValue.timeValueMinutes(2));
SearchService service = getInstanceFromNode(SearchService.class);

assertThat(
Expand Down Expand Up @@ -570,7 +570,7 @@ public void testConcurrentSearches() throws Exception {
request.setIndices(new String[] { "index" });
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
CreatePitResponse pitResponse = execute.get();
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime());
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime(), TimeValue.timeValueDays(1));
assertSegments(false, client(), pitResponse.getId());
Thread[] threads = new Thread[5];
CountDownLatch latch = new CountDownLatch(threads.length);
Expand Down

0 comments on commit 5ae704f

Please sign in to comment.