Skip to content

Commit

Permalink
Retry on ClusterBlockException on transform destination index (elasti…
Browse files Browse the repository at this point in the history
…c#118194)

* Retry on ClusterBlockException on transform destination index

* Update docs/changelog/118194.yaml

* Cleaning up tests

* Fixing tests

---------

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
dan-rubinstein and elasticmachine committed Dec 12, 2024
1 parent 159ecaf commit a8faebb
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 61 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/118194.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118194
summary: Retry on `ClusterBlockException` on transform destination index
area: Machine Learning
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,14 @@ private void handleScriptException(ScriptException scriptException, boolean unat
* @param numFailureRetries the number of configured retries
*/
private void handleBulkIndexingException(BulkIndexingException bulkIndexingException, boolean unattended, int numFailureRetries) {
if (unattended == false && bulkIndexingException.isIrrecoverable()) {
if (bulkIndexingException.getCause() instanceof ClusterBlockException) {
retryWithoutIncrementingFailureCount(
bulkIndexingException,
bulkIndexingException.getDetailedMessage(),
unattended,
numFailureRetries
);
} else if (unattended == false && bulkIndexingException.isIrrecoverable()) {
String message = TransformMessages.getMessage(
TransformMessages.LOG_TRANSFORM_PIVOT_IRRECOVERABLE_BULK_INDEXING_ERROR,
bulkIndexingException.getDetailedMessage()
Expand Down Expand Up @@ -232,12 +239,46 @@ private void retry(Throwable unwrappedException, String message, boolean unatten
&& unwrappedException.getClass().equals(context.getLastFailure().getClass());

final int failureCount = context.incrementAndGetFailureCount(unwrappedException);

if (unattended == false && numFailureRetries != -1 && failureCount > numFailureRetries) {
fail(unwrappedException, "task encountered more than " + numFailureRetries + " failures; latest failure: " + message);
return;
}

logRetry(unwrappedException, message, unattended, numFailureRetries, failureCount, repeatedFailure);
}

/**
* Terminate failure handling without incrementing the retries used
* <p>
* This is used when there is an ongoing recoverable issue and we want to retain
* retries for any issues that may occur after the issue is resolved
*
* @param unwrappedException The exception caught
* @param message error message to log/audit
* @param unattended whether the transform runs in unattended mode
* @param numFailureRetries the number of configured retries
*/
private void retryWithoutIncrementingFailureCount(
Throwable unwrappedException,
String message,
boolean unattended,
int numFailureRetries
) {
// group failures to decide whether to report it below
final boolean repeatedFailure = context.getLastFailure() != null
&& unwrappedException.getClass().equals(context.getLastFailure().getClass());

logRetry(unwrappedException, message, unattended, numFailureRetries, context.getFailureCount(), repeatedFailure);
}

private void logRetry(
Throwable unwrappedException,
String message,
boolean unattended,
int numFailureRetries,
int failureCount,
boolean repeatedFailure
) {
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
// and if the number of retries is about to exceed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor;

import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -63,9 +64,121 @@ public int getFailureCountChangedCounter() {
}
}

public void testUnattended() {
public void testHandleIndexerFailure_CircuitBreakingExceptionNewPageSizeLessThanMinimumPageSize() {
var e = new CircuitBreakingException(randomAlphaOfLength(10), 1, 0, randomFrom(CircuitBreaker.Durability.values()));
assertRetryIfUnattendedOtherwiseFail(e);
}

public void testHandleIndexerFailure_CircuitBreakingExceptionNewPageSizeNotLessThanMinimumPageSize() {
var e = new CircuitBreakingException(randomAlphaOfLength(10), 1, 1, randomFrom(CircuitBreaker.Durability.values()));

List.of(true, false).forEach((unattended) -> { assertNoFailureAndContextPageSizeSet(e, unattended, 365); });
}

public void testHandleIndexerFailure_ScriptException() {
var e = new ScriptException(
randomAlphaOfLength(10),
new ArithmeticException(randomAlphaOfLength(10)),
singletonList(randomAlphaOfLength(10)),
randomAlphaOfLength(10),
randomAlphaOfLength(10)
);
assertRetryIfUnattendedOtherwiseFail(e);
}

public void testHandleIndexerFailure_BulkIndexExceptionWrappingClusterBlockException() {
final BulkIndexingException bulkIndexingException = new BulkIndexingException(
randomAlphaOfLength(10),
new ClusterBlockException(Map.of("test-index", Set.of(MetadataIndexStateService.INDEX_CLOSED_BLOCK))),
randomBoolean()
);

List.of(true, false).forEach((unattended) -> { assertRetryFailureCountNotIncremented(bulkIndexingException, unattended); });
}

public void testHandleIndexerFailure_IrrecoverableBulkIndexException() {
final BulkIndexingException e = new BulkIndexingException(
randomAlphaOfLength(10),
new ElasticsearchStatusException(randomAlphaOfLength(10), RestStatus.INTERNAL_SERVER_ERROR),
true
);
assertRetryIfUnattendedOtherwiseFail(e);
}

public void testHandleIndexerFailure_RecoverableBulkIndexException() {
final BulkIndexingException bulkIndexingException = new BulkIndexingException(
randomAlphaOfLength(10),
new ElasticsearchStatusException(randomAlphaOfLength(10), RestStatus.INTERNAL_SERVER_ERROR),
false
);

List.of(true, false).forEach((unattended) -> { assertRetry(bulkIndexingException, unattended); });
}

public void testHandleIndexerFailure_ClusterBlockException() {
List.of(true, false).forEach((unattended) -> {
assertRetry(
new ClusterBlockException(Map.of(randomAlphaOfLength(10), Set.of(MetadataIndexStateService.INDEX_CLOSED_BLOCK))),
unattended
);
});
}

public void testHandleIndexerFailure_SearchPhaseExecutionExceptionWithNoShardSearchFailures() {
List.of(true, false).forEach((unattended) -> {
assertRetry(
new SearchPhaseExecutionException(randomAlphaOfLength(10), randomAlphaOfLength(10), ShardSearchFailure.EMPTY_ARRAY),
unattended
);
});
}

public void testHandleIndexerFailure_SearchPhaseExecutionExceptionWithShardSearchFailures() {
List.of(true, false).forEach((unattended) -> {
assertRetry(
new SearchPhaseExecutionException(
randomAlphaOfLength(10),
randomAlphaOfLength(10),
new ShardSearchFailure[] { new ShardSearchFailure(new Exception()) }
),
unattended
);
});
}

public void testHandleIndexerFailure_RecoverableElasticsearchException() {
List.of(true, false).forEach((unattended) -> {
assertRetry(new ElasticsearchStatusException(randomAlphaOfLength(10), RestStatus.INTERNAL_SERVER_ERROR), unattended);
});
}

public void testHandleIndexerFailure_IrrecoverableElasticsearchException() {
var e = new ElasticsearchStatusException(randomAlphaOfLength(10), RestStatus.NOT_FOUND);
assertRetryIfUnattendedOtherwiseFail(e);
}

public void testHandleIndexerFailure_IllegalArgumentException() {
var e = new IllegalArgumentException(randomAlphaOfLength(10));
assertRetryIfUnattendedOtherwiseFail(e);
}

public void testHandleIndexerFailure_UnexpectedException() {
List.of(true, false).forEach((unattended) -> { assertRetry(new Exception(), unattended); });
}

private void assertRetryIfUnattendedOtherwiseFail(Exception e) {
List.of(true, false).forEach((unattended) -> {
if (unattended) {
assertRetry(e, unattended);
} else {
assertFailure(e);
}
});
}

private void assertRetry(Exception e, boolean unattended) {
String transformId = randomAlphaOfLength(10);
SettingsConfig settings = new SettingsConfig.Builder().setUnattended(true).build();
SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).setUnattended(unattended).build();

MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
MockTransformContextListener contextListener = new MockTransformContextListener();
Expand All @@ -74,51 +187,33 @@ public void testUnattended() {

TransformFailureHandler handler = new TransformFailureHandler(auditor, context, transformId);

handler.handleIndexerFailure(
new SearchPhaseExecutionException(
"query",
"Partial shards failure",
new ShardSearchFailure[] {
new ShardSearchFailure(new CircuitBreakingException("to much memory", 110, 100, CircuitBreaker.Durability.TRANSIENT)) }
),
settings
);
assertNoFailure(handler, e, contextListener, settings, true);
assertNoFailure(handler, e, contextListener, settings, true);
if (unattended) {
assertNoFailure(handler, e, contextListener, settings, true);
} else {
// fail after max retry attempts reached
assertFailure(handler, e, contextListener, settings, true);
}
}

// CBE isn't a failure, but it only affects page size(which we don't test here)
assertFalse(contextListener.getFailed());
assertEquals(0, contextListener.getFailureCountChangedCounter());
private void assertRetryFailureCountNotIncremented(Exception e, boolean unattended) {
String transformId = randomAlphaOfLength(10);
SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).setUnattended(unattended).build();

assertNoFailure(
handler,
new SearchPhaseExecutionException(
"query",
"Partial shards failure",
new ShardSearchFailure[] {
new ShardSearchFailure(
new ScriptException(
"runtime error",
new ArithmeticException("/ by zero"),
singletonList("stack"),
"test",
"painless"
)
) }
),
contextListener,
settings
);
assertNoFailure(
handler,
new ElasticsearchStatusException("something really bad happened", RestStatus.INTERNAL_SERVER_ERROR),
contextListener,
settings
);
assertNoFailure(handler, new IllegalArgumentException("expected apples not oranges"), contextListener, settings);
assertNoFailure(handler, new RuntimeException("the s*** hit the fan"), contextListener, settings);
assertNoFailure(handler, new NullPointerException("NPE"), contextListener, settings);
MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
MockTransformContextListener contextListener = new MockTransformContextListener();
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
context.setPageSize(500);

TransformFailureHandler handler = new TransformFailureHandler(auditor, context, transformId);

assertNoFailure(handler, e, contextListener, settings, false);
assertNoFailure(handler, e, contextListener, settings, false);
assertNoFailure(handler, e, contextListener, settings, false);
}

public void testClusterBlock() {
private void assertFailure(Exception e) {
String transformId = randomAlphaOfLength(10);
SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).build();

Expand All @@ -129,32 +224,50 @@ public void testClusterBlock() {

TransformFailureHandler handler = new TransformFailureHandler(auditor, context, transformId);

final ClusterBlockException clusterBlock = new ClusterBlockException(
Map.of("test-index", Set.of(MetadataIndexStateService.INDEX_CLOSED_BLOCK))
);
assertFailure(handler, e, contextListener, settings, false);
}

handler.handleIndexerFailure(clusterBlock, settings);
assertFalse(contextListener.getFailed());
assertEquals(1, contextListener.getFailureCountChangedCounter());
private void assertNoFailure(
TransformFailureHandler handler,
Exception e,
MockTransformContextListener mockTransformContextListener,
SettingsConfig settings,
boolean failureCountIncremented
) {
handler.handleIndexerFailure(e, settings);
assertFalse(mockTransformContextListener.getFailed());
assertEquals(failureCountIncremented ? 1 : 0, mockTransformContextListener.getFailureCountChangedCounter());
mockTransformContextListener.reset();
}

handler.handleIndexerFailure(clusterBlock, settings);
assertFalse(contextListener.getFailed());
assertEquals(2, contextListener.getFailureCountChangedCounter());
private void assertNoFailureAndContextPageSizeSet(Exception e, boolean unattended, int newPageSize) {
String transformId = randomAlphaOfLength(10);
SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).setUnattended(unattended).build();

handler.handleIndexerFailure(clusterBlock, settings);
assertTrue(contextListener.getFailed());
assertEquals(3, contextListener.getFailureCountChangedCounter());
MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
MockTransformContextListener contextListener = new MockTransformContextListener();
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
context.setPageSize(500);

TransformFailureHandler handler = new TransformFailureHandler(auditor, context, transformId);

handler.handleIndexerFailure(e, settings);
assertFalse(contextListener.getFailed());
assertEquals(0, contextListener.getFailureCountChangedCounter());
assertEquals(newPageSize, context.getPageSize());
contextListener.reset();
}

private void assertNoFailure(
private void assertFailure(
TransformFailureHandler handler,
Exception e,
MockTransformContextListener mockTransformContextListener,
SettingsConfig settings
SettingsConfig settings,
boolean failureCountChanged
) {
handler.handleIndexerFailure(e, settings);
assertFalse(mockTransformContextListener.getFailed());
assertEquals(1, mockTransformContextListener.getFailureCountChangedCounter());
assertTrue(mockTransformContextListener.getFailed());
assertEquals(failureCountChanged ? 1 : 0, mockTransformContextListener.getFailureCountChangedCounter());
mockTransformContextListener.reset();
}

Expand Down

0 comments on commit a8faebb

Please sign in to comment.