Skip to content

Commit

Permalink
First (incorrect) attempt to call _resolve/index when TransportVersio…
Browse files Browse the repository at this point in the history
…n failure caught
  • Loading branch information
quux00 committed Dec 20, 2023
1 parent 496b1c0 commit e7aee51
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.action.admin.indices.resolve;

import org.elasticsearch.Build;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -45,6 +46,14 @@ private ResolveClusterInfo(boolean connected, Boolean skipUnavailable, Boolean m
}

public ResolveClusterInfo(StreamInput in) throws IOException {
if (in.getTransportVersion().before(TransportVersions.RESOLVE_CLUSTER_ENDPOINT_ADDED)) {
throw new UnsupportedOperationException(
"ResolveClusterAction requires at least Transport Version "
+ TransportVersions.RESOLVE_CLUSTER_ENDPOINT_ADDED
+ " but was "
+ in.getTransportVersion()
);
}
this.connected = in.readBoolean();
this.skipUnavailable = in.readOptionalBoolean();
this.matchingIndices = in.readOptionalBoolean();
Expand All @@ -58,6 +67,14 @@ public ResolveClusterInfo(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().before(TransportVersions.RESOLVE_CLUSTER_ENDPOINT_ADDED)) {
throw new UnsupportedOperationException(
"ResolveClusterAction requires at least Transport Version "
+ TransportVersions.RESOLVE_CLUSTER_ENDPOINT_ADDED
+ " but was "
+ out.getTransportVersion()
);
}
out.writeBoolean(connected);
out.writeOptionalBoolean(skipUnavailable);
out.writeOptionalBoolean(matchingIndices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,21 +168,65 @@ public void onFailure(Exception failure) {
return;
}
if (notConnectedError(failure)) {
logger.warn("UUU DEBUG 2");
clusterInfoMap.put(clusterAlias, new ResolveClusterInfo(false, skipUnavailable));
} else if (clusterResolveEndpointNotFound(failure)) {
// MP TODO: remove this pathway - I don't think it will ever execute now?
logger.warn("UUU DEBUG 3");
// if the endpoint returns an error that it does not _resolve/cluster, we know we are connected
// TODO: call remoteClusterClient.admin().indices().resolveIndex() to fill in 'matching_indices'?
clusterInfoMap.put(clusterAlias, new ResolveClusterInfo(true, skipUnavailable));
} else {
logger.warn("UUU DEBUG 4");
Throwable cause = ExceptionsHelper.unwrapCause(failure);
// it is not clear that this error indicates that the cluster is disconnected, but it is hard to
// determine based on the error, so we default to false in the face of any error and report it
// back to the user for consideration
clusterInfoMap.put(clusterAlias, new ResolveClusterInfo(false, skipUnavailable, cause.toString()));
logger.warn(
() -> Strings.format("Failure from _resolve/cluster lookup against cluster %s: ", clusterAlias),
failure
);
if (cause instanceof UnsupportedOperationException
&& cause.getMessage().contains("ResolveClusterAction requires at least Transport Version")) {
logger.warn("UUU DEBUG 5: now calling _resolve/indices instead");
/// MP TODO: document what we're doing here
ResolveIndexAction.Request resolveIndexRequest = new ResolveIndexAction.Request(
originalIndices.indices(),
originalIndices.indicesOptions()
);
remoteClusterClient.admin().indices().resolveIndex(resolveIndexRequest, new ActionListener<>() {
@Override
public void onResponse(ResolveIndexAction.Response response) {
boolean matchingIndices = response.getIndices().size() > 0
|| response.getAliases().size() > 0
|| response.getDataStreams().size() > 0;
logger.warn("UUU DEBUG 44: matchingIndices: " + matchingIndices);
clusterInfoMap.put(
clusterAlias,
new ResolveClusterInfo(true, skipUnavailable, matchingIndices, null)
);
}

@Override
public void onFailure(Exception e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
logger.warn("UUU DEBUG 55: failure: " + e);
clusterInfoMap.put(
clusterAlias,
new ResolveClusterInfo(false, skipUnavailable, cause.toString())
);
logger.warn(
() -> Strings.format(
"Failure from _resolve/cluster lookup against cluster %s: ",
clusterAlias
),
e
);
}
});
} else {
// it is not clear that this error indicates that the cluster is disconnected, but it is hard to
// determine based on the error, so we default to false in the face of any error and report it
// back to the user for consideration
clusterInfoMap.put(clusterAlias, new ResolveClusterInfo(false, skipUnavailable, cause.toString()));
logger.warn(
() -> Strings.format("Failure from _resolve/cluster lookup against cluster %s: ", clusterAlias),
failure
);
}
}
if (resolveClusterTask.isCancelled()) {
releaseResourcesOnCancel.run();
Expand Down

0 comments on commit e7aee51

Please sign in to comment.