Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add to and from XContent to ClusterBlock and ClusterBlocks #13694

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 104 additions & 6 deletions server/src/main/java/org/opensearch/cluster/block/ClusterBlock.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,28 @@

package org.opensearch.cluster.block;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;
import java.util.EnumSet;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;

import static org.opensearch.cluster.metadata.Metadata.CONTEXT_MODE_API;
import static org.opensearch.cluster.metadata.Metadata.CONTEXT_MODE_PARAM;

/**
* Blocks the cluster for concurrency
Expand All @@ -53,7 +62,21 @@
*/
@PublicApi(since = "1.0.0")
public class ClusterBlock implements Writeable, ToXContentFragment {

private static final Logger logger = LogManager.getLogger(ClusterBlock.class);
static final String KEY_UUID = "uuid";
static final String KEY_DESCRIPTION = "description";
static final String KEY_RETRYABLE = "retryable";
static final String KEY_DISABLE_STATE_PERSISTENCE = "disable_state_persistence";
static final String KEY_LEVELS = "levels";
static final String KEY_STATUS = "status";
static final String KEY_ALLOW_RELEASE_RESOURCES = "allow_release_resources";
private static final Set<String> VALID_FIELDS = Sets.newHashSet(
KEY_UUID,
KEY_DESCRIPTION,
KEY_RETRYABLE,
KEY_DISABLE_STATE_PERSISTENCE,
KEY_LEVELS
);
private final int id;
@Nullable
private final String uuid;
Expand Down Expand Up @@ -154,24 +177,99 @@ public boolean disableStatePersistence() {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Metadata.XContentContext context = Metadata.XContentContext.valueOf(params.param(CONTEXT_MODE_PARAM, CONTEXT_MODE_API));
builder.startObject(Integer.toString(id));
if (uuid != null) {
builder.field("uuid", uuid);
builder.field(KEY_UUID, uuid);
}
builder.field("description", description);
builder.field("retryable", retryable);
builder.field(KEY_DESCRIPTION, description);
builder.field(KEY_RETRYABLE, retryable);
if (disableStatePersistence) {
builder.field("disable_state_persistence", disableStatePersistence);
builder.field(KEY_DISABLE_STATE_PERSISTENCE, disableStatePersistence);
}
builder.startArray("levels");
builder.startArray(KEY_LEVELS);
for (ClusterBlockLevel level : levels) {
builder.value(level.name().toLowerCase(Locale.ROOT));
}
builder.endArray();
if (context == Metadata.XContentContext.GATEWAY) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this cause BWC failures for non-remote cluster?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are only passing GATEWAY as a context when custom metadata should be stored as part of the persistent cluster state. Don't see any reason this could cause any BWC issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a possibility of mismatch of nodes with old and new version of this code? (say OpenSearch 2.14 upgrading to 2.15). In that case, this additional fields (introduced in 2.15) can't be read by older nodes running in 2.14.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The XContent we are creating is not being sent to the nodes of older version. We will just be uploading this to remote repo. In case of upgrade, we will start publication of cluster state to nodes, when all the nodes have moved to 2.15. So, this would not cause any problem imo

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it not be consumed by the data nodes from remote?

builder.field(KEY_STATUS, status);
builder.field(KEY_ALLOW_RELEASE_RESOURCES, allowReleaseResources);
}
builder.endObject();
return builder;
}

public static ClusterBlock fromXContent(XContentParser parser, int id) throws IOException {
String uuid = null;
String description = null;
boolean retryable = false;
boolean disableStatePersistence = false;
RestStatus status = null;
boolean allowReleaseResources = false;
EnumSet<ClusterBlockLevel> levels = EnumSet.noneOf(ClusterBlockLevel.class);
String currentFieldName = skipBlockID(parser);
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
switch (Objects.requireNonNull(currentFieldName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentFiledName will always be non null after first field. For this logic to work, currentFieldName should be reset, once the corresponding value is parsed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it will always be non Null. I think I will get rid of this nonNull check rather than resetting the field every time we parse something.

case KEY_UUID:
uuid = parser.text();
break;
case KEY_DESCRIPTION:
description = parser.text();
break;
case KEY_RETRYABLE:
retryable = parser.booleanValue();
break;
case KEY_DISABLE_STATE_PERSISTENCE:
disableStatePersistence = parser.booleanValue();
break;
case KEY_STATUS:
status = RestStatus.valueOf(parser.text());
break;
case KEY_ALLOW_RELEASE_RESOURCES:
allowReleaseResources = parser.booleanValue();
break;
default:
logger.warn("unknown field [{}]", currentFieldName);
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (currentFieldName.equals(KEY_LEVELS)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
levels.add(ClusterBlockLevel.fromString(parser.text(), Locale.ROOT));
}
} else {
logger.warn("unknown field [{}]", currentFieldName);
}
} else {
throw new IllegalArgumentException("unexpected token [" + token + "]");
}
}
return new ClusterBlock(id, uuid, description, retryable, disableStatePersistence, allowReleaseResources, status, levels);
}

private static String skipBlockID(XContentParser parser) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add explanation of why this is needed? Probably an example will help here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have written the fromXContent is a way that if it can parse the ClusterBlock as a XContentFragment as well as a complete Object. This specific method is added to enable the parsing ClusterBlock as a complete XContentObject like given below.

{
  "block-1" : {
    "uuid": ....
  }
}

Although, we might never need to parse such object. But the UT I have written in ClusterBlockTests.java, we are creating a complete XContentObject from cluster block and trying to parse it back to ClusterBlock object, that is the reason this was required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we differentiate between XContentObject and XConentFragment? Can we separate XContentObject parsing to a separate function and then use the above method to parse the XConentFragment inside that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

XContentObject has a paired start end object. While for fragment that is not true. Sure, we can separate the logic to parse the fragment to a innerParser and use that in fromXContent method which can accept both Object and a Fragment.

if (parser.currentToken() == null) {
shiv0408 marked this conversation as resolved.
Show resolved Hide resolved
parser.nextToken();
}
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
parser.nextToken();
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) {
String currentFieldName = parser.currentName();
if (VALID_FIELDS.contains(currentFieldName)) {
return currentFieldName;
} else {
// we have hit block id, just move on
parser.nextToken();
shiv0408 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.common.annotation.PublicApi;

import java.util.EnumSet;
import java.util.Locale;

/**
* What level to block the cluster
Expand All @@ -51,4 +52,11 @@ public enum ClusterBlockLevel {

public static final EnumSet<ClusterBlockLevel> ALL = EnumSet.allOf(ClusterBlockLevel.class);
public static final EnumSet<ClusterBlockLevel> READ_WRITE = EnumSet.of(READ, WRITE);

/*
* This method is used to convert a string to a ClusterBlockLevel.
* */
public static ClusterBlockLevel fromString(String level, Locale locale) {
return ClusterBlockLevel.valueOf(level.toUpperCase(locale));
}
}
114 changes: 112 additions & 2 deletions server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.cluster.block;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.AbstractDiffable;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -42,6 +44,11 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -62,7 +69,8 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class ClusterBlocks extends AbstractDiffable<ClusterBlocks> {
public class ClusterBlocks extends AbstractDiffable<ClusterBlocks> implements ToXContentFragment {
private static final Logger logger = LogManager.getLogger(ClusterBlocks.class);
public static final ClusterBlocks EMPTY_CLUSTER_BLOCK = new ClusterBlocks(emptySet(), Map.of());

private final Set<ClusterBlock> global;
Expand Down Expand Up @@ -325,6 +333,16 @@ public static Diff<ClusterBlocks> readDiffFrom(StreamInput in) throws IOExceptio
return AbstractDiffable.readDiffFrom(ClusterBlocks::readFrom, in);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Builder.toXContext(this, builder, params);
return builder;
}

public static ClusterBlocks fromXContent(XContentParser parser) throws IOException {
return Builder.fromXContent(parser);
}

/**
* An immutable level holder.
*
Expand Down Expand Up @@ -353,6 +371,10 @@ public static Builder builder() {
return new Builder();
}

public static Builder builder(ClusterBlocks clusterBlocks) {
return new Builder(clusterBlocks);
}

/**
* Builder for cluster blocks.
*
Expand All @@ -367,6 +389,11 @@ public static class Builder {

public Builder() {}

public Builder(ClusterBlocks clusterBlocks) {
this.global.addAll(clusterBlocks.global());
this.indices.putAll(clusterBlocks.indices());
}

public Builder blocks(ClusterBlocks blocks) {
global.addAll(blocks.global());
for (final Map.Entry<String, Set<ClusterBlock>> entry : blocks.indices().entrySet()) {
Expand Down Expand Up @@ -426,10 +453,16 @@ public Builder removeGlobalBlock(int blockId) {
}

public Builder addIndexBlock(String index, ClusterBlock block) {
prepareIndexForBlocks(index);
indices.get(index).add(block);
return this;
}

// initialize an index adding further blocks
private Builder prepareIndexForBlocks(String index) {
if (!indices.containsKey(index)) {
indices.put(index, new HashSet<>());
}
indices.get(index).add(block);
return this;
}

Expand Down Expand Up @@ -479,5 +512,82 @@ public ClusterBlocks build() {
}
return new ClusterBlocks(unmodifiableSet(new HashSet<>(global)), indicesBuilder);
}

public static void toXContext(ClusterBlocks blocks, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject("blocks");
if (blocks.global().isEmpty() == false) {
builder.startObject("global");
for (ClusterBlock block : blocks.global()) {
block.toXContent(builder, params);
}
builder.endObject();
}

if (blocks.indices().isEmpty() == false) {
builder.startObject("indices");
for (Map.Entry<String, Set<ClusterBlock>> entry : blocks.indices().entrySet()) {
builder.startObject(entry.getKey());
for (ClusterBlock block : entry.getValue()) {
block.toXContent(builder, params);
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
}

public static ClusterBlocks fromXContent(XContentParser parser) throws IOException {
Builder builder = new Builder();
String currentFieldName = skipBlocksField(parser);
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser);
currentFieldName = parser.currentName();
parser.nextToken();
switch (currentFieldName) {
case "global":
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
currentFieldName = parser.currentName();
parser.nextToken();
builder.addGlobalBlock(ClusterBlock.fromXContent(parser, Integer.parseInt(currentFieldName)));
}
break;
case "indices":
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
String indexName = parser.currentName();
parser.nextToken();
// prepare for this index as we want to add this to ClusterBlocks even if there is no Block associated with it
builder.prepareIndexForBlocks(indexName);
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
currentFieldName = parser.currentName();
parser.nextToken();
builder.addIndexBlock(indexName, ClusterBlock.fromXContent(parser, Integer.parseInt(currentFieldName)));
}
}
break;
default:
logger.warn("unknown field [{}]", currentFieldName);
}
}
return builder.build();
}

private static String skipBlocksField(XContentParser parser) throws IOException {
if (parser.currentToken() == null) {
parser.nextToken();
}
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
parser.nextToken();
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) {
if ("blocks".equals(parser.currentName())) {
parser.nextToken();
} else {
return parser.currentName();
}
}
}
return null;
}
}
}
Loading
Loading