Skip to content

Commit

Permalink
Merge branch 'main' into snapshot-pinned-timestamp-delete
Browse files Browse the repository at this point in the history
Signed-off-by: Anshu Agarwal <[email protected]>
  • Loading branch information
Anshu Agarwal committed Sep 3, 2024
2 parents 54bee31 + a60b668 commit c5ef40f
Show file tree
Hide file tree
Showing 51 changed files with 2,007 additions and 465 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))
- Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409))
- Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557))
- Optimise snapshot deletion to speed up snapshot deletion and creation ([#15568](https://github.com/opensearch-project/OpenSearch/pull/15568))
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void testSnapshotsStatus() {
boolean ignoreUnavailable = randomBoolean();
String endpoint = "/_snapshot/" + repository + "/" + snapshotNames.toString() + "/_status";

SnapshotsStatusRequest snapshotsStatusRequest = new SnapshotsStatusRequest(repository, snapshots, indices);
SnapshotsStatusRequest snapshotsStatusRequest = (new SnapshotsStatusRequest(repository, snapshots)).indices(indices);
RequestConvertersTests.setRandomClusterManagerTimeout(snapshotsStatusRequest, expectedParams);
snapshotsStatusRequest.ignoreUnavailable(ignoreUnavailable);
expectedParams.put("ignore_unavailable", Boolean.toString(ignoreUnavailable));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -969,9 +969,13 @@ public <T extends Writeable> void writeOptionalArray(@Nullable T[] array) throws
}

public void writeOptionalWriteable(@Nullable Writeable writeable) throws IOException {
writeOptionalWriteable((out, writable) -> writable.writeTo(out), writeable);
}

public <T extends Writeable> void writeOptionalWriteable(final Writer<T> writer, @Nullable T writeable) throws IOException {
if (writeable != null) {
writeBoolean(true);
writeable.writeTo(this);
writer.write(this, writeable);
} else {
writeBoolean(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
private static void assertSnapshotStatusSuccessful(RestHighLevelClient client, String repoName,
String[] snapshots, String[] indices) throws IOException {
final SnapshotsStatusResponse statusResponse = client.snapshot()
.status(new SnapshotsStatusRequest(repoName, snapshots, indices), RequestOptions.DEFAULT);
.status((new SnapshotsStatusRequest(repoName, snapshots)).indices(indices), RequestOptions.DEFAULT);
for (SnapshotStatus status : statusResponse.getSnapshots()) {
assertThat(status.getShardsStats().getFailedShards(), is(0));
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public ClusterAllocationExplanation(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
shardRouting.writeTo(out);
out.writeOptionalWriteable(currentNode);
out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), currentNode);
out.writeOptionalWriteable(relocationTargetNode);
out.writeOptionalWriteable(clusterInfo);
shardAllocationDecision.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ private static StorageType fromString(String string) {
private StorageType storageType = StorageType.LOCAL;
@Nullable
private String sourceRemoteStoreRepository = null;
@Nullable
private String sourceRemoteTranslogRepository = null;

@Nullable // if any snapshot UUID will do
private String snapshotUuid;
Expand Down Expand Up @@ -159,6 +161,9 @@ public RestoreSnapshotRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_10_0)) {
sourceRemoteStoreRepository = in.readOptionalString();
}
if (in.getVersion().onOrAfter(Version.CURRENT)) {
sourceRemoteTranslogRepository = in.readOptionalString();
}
}

@Override
Expand All @@ -183,6 +188,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_10_0)) {
out.writeOptionalString(sourceRemoteStoreRepository);
}
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalString(sourceRemoteTranslogRepository);
}
}

@Override
Expand Down Expand Up @@ -545,6 +553,16 @@ public RestoreSnapshotRequest setSourceRemoteStoreRepository(String sourceRemote
return this;
}

/**
* Sets Source Remote Translog Repository for all the restored indices
*
* @param sourceRemoteTranslogRepository name of the remote translog repository that should be used for all restored indices.
*/
public RestoreSnapshotRequest setSourceRemoteTranslogRepository(String sourceRemoteTranslogRepository) {
this.sourceRemoteTranslogRepository = sourceRemoteTranslogRepository;
return this;
}

/**
* Returns Source Remote Store Repository for all the restored indices
*
Expand All @@ -554,6 +572,15 @@ public String getSourceRemoteStoreRepository() {
return sourceRemoteStoreRepository;
}

/**
* Returns Source Remote Translog Repository for all the restored indices
*
* @return source Remote Translog Repository
*/
public String getSourceRemoteTranslogRepository() {
return sourceRemoteTranslogRepository;
}

/**
* Parses restore definition
*
Expand Down Expand Up @@ -673,6 +700,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (sourceRemoteStoreRepository != null) {
builder.field("source_remote_store_repository", sourceRemoteStoreRepository);
}
if (sourceRemoteTranslogRepository != null) {
builder.field("source_remote_translog_repository", sourceRemoteTranslogRepository);
}
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -701,7 +731,8 @@ public boolean equals(Object o) {
&& Arrays.equals(ignoreIndexSettings, that.ignoreIndexSettings)
&& Objects.equals(snapshotUuid, that.snapshotUuid)
&& Objects.equals(storageType, that.storageType)
&& Objects.equals(sourceRemoteStoreRepository, that.sourceRemoteStoreRepository);
&& Objects.equals(sourceRemoteStoreRepository, that.sourceRemoteStoreRepository)
&& Objects.equals(sourceRemoteTranslogRepository, that.sourceRemoteTranslogRepository);
return equals;
}

Expand All @@ -721,7 +752,8 @@ public int hashCode() {
indexSettings,
snapshotUuid,
storageType,
sourceRemoteStoreRepository
sourceRemoteStoreRepository,
sourceRemoteTranslogRepository
);
result = 31 * result + Arrays.hashCode(indices);
result = 31 * result + Arrays.hashCode(ignoreIndexSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ public SnapshotsStatusRequest() {}
* @param repository repository name
* @param snapshots list of snapshots
*/
public SnapshotsStatusRequest(String repository, String[] snapshots) {
this.repository = repository;
this.snapshots = snapshots;
}

/**
* Constructs a new get snapshots request with given repository name and list of snapshots
*
* @param repository repository name
* @param snapshots list of snapshots
* @param indices list of indices
*/
public SnapshotsStatusRequest(String repository, String[] snapshots, String[] indices) {
this.repository = repository;
this.snapshots = snapshots;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ public DiscoveryNode getNode() {

@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
node.writeToWithAttribute(out);
}
}
14 changes: 12 additions & 2 deletions server/src/main/java/org/opensearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(stateUUID);
metadata.writeTo(out);
routingTable.writeTo(out);
nodes.writeTo(out);
nodes.writeToWithAttribute(out);
blocks.writeTo(out);
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
Expand Down Expand Up @@ -887,13 +887,23 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(toUuid);
out.writeLong(toVersion);
routingTable.writeTo(out);
nodes.writeTo(out);
nodesWriteToWithAttributes(nodes, out);
metadata.writeTo(out);
blocks.writeTo(out);
customs.writeTo(out);
out.writeVInt(minimumClusterManagerNodesOnPublishingClusterManager);
}

private void nodesWriteToWithAttributes(Diff<DiscoveryNodes> nodes, StreamOutput out) throws IOException {
DiscoveryNodes part = nodes.apply(null);
if (part != null) {
out.writeBoolean(true);
part.writeToWithAttribute(out);
} else {
out.writeBoolean(false);
}
}

@Override
public ClusterState apply(ClusterState state) {
Builder builder = new Builder(clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public Join(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
sourceNode.writeTo(out);
targetNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
targetNode.writeToWithAttribute(out);
out.writeLong(term);
out.writeLong(lastAcceptedTerm);
out.writeLong(lastAcceptedVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public JoinRequest(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
out.writeLong(minimumTerm);
out.writeOptionalWriteable(optionalJoin.orElse(null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public StartJoinRequest(StreamInput input) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
out.writeLong(term);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ public DiscoveryNode(StreamInput in) throws IOException {
for (int i = 0; i < size; i++) {
this.attributes.put(in.readString(), in.readString());
}

int rolesSize = in.readVInt();
final Set<DiscoveryNodeRole> roles = new HashSet<>(rolesSize);
for (int i = 0; i < rolesSize; i++) {
Expand Down Expand Up @@ -359,13 +360,31 @@ public DiscoveryNode(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
writeToUtil(out, false);
} else {
writeToUtil(out, true);
}

}

public void writeToWithAttribute(StreamOutput out) throws IOException {
writeToUtil(out, true);
}

public void writeToUtil(StreamOutput out, boolean includeAllAttributes) throws IOException {
writeNodeDetails(out);

out.writeVInt(attributes.size());
for (Map.Entry<String, String> entry : attributes.entrySet()) {
out.writeString(entry.getKey());
out.writeString(entry.getValue());
if (includeAllAttributes) {
out.writeVInt(attributes.size());
for (Map.Entry<String, String> entry : attributes.entrySet()) {
out.writeString(entry.getKey());
out.writeString(entry.getValue());
}
} else {
out.writeVInt(0);
}

writeRolesAndVersion(out);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,10 +690,18 @@ public String shortSummary() {

@Override
public void writeTo(StreamOutput out) throws IOException {
writeToUtil((output, value) -> value.writeTo(output), out);
}

public void writeToWithAttribute(StreamOutput out) throws IOException {
writeToUtil((output, value) -> value.writeToWithAttribute(output), out);
}

public void writeToUtil(final Writer<DiscoveryNode> writer, StreamOutput out) throws IOException {
writeClusterManager(out);
out.writeVInt(nodes.size());
for (DiscoveryNode node : this) {
node.writeTo(out);
writer.write(out, node);
}
}

Expand Down
Loading

0 comments on commit c5ef40f

Please sign in to comment.