Skip to content

Commit

Permalink
OpenSearchSink should close open files before retrying initialization (
Browse files Browse the repository at this point in the history
…opensearch-project#2255)

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and kkondaka authored Feb 9, 2023
1 parent e32e1c9 commit f782b66
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public boolean execute() {
}
}
if (waitingPipelineNames.size() > 0) {
LOG.info("One or more Pipelines are not ready even after {} retries.", numRetries);
LOG.info("One or more Pipelines are not ready even after {} retries. Shutting down pipelines", numRetries);
shutdown();
throw new RuntimeException("Failed to start pipelines");
}
transformationPipelines.forEach((name, pipeline) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public void doInitialize() {
doInitializeInternal();
} catch (IOException e) {
LOG.warn("Failed to initialize OpenSearch sink, retrying. Error - " + e.getCause());
closeFiles();
} catch (InvalidPluginConfigurationException e) {
LOG.error("Failed to initialize OpenSearch sink.");
this.shutdown();
Expand All @@ -118,6 +119,7 @@ public void doInitialize() {
throw e;
}
LOG.warn("Failed to initialize OpenSearch sink, retrying. Error - " + e.getCause());
closeFiles();
}
}

Expand Down Expand Up @@ -252,9 +254,7 @@ private void logFailure(final BulkOperation bulkOperation, final Throwable failu
}
}

@Override
public void shutdown() {
super.shutdown();
private void closeFiles() {
// Close the client. This closes the low-level client which will close it for both high-level clients.
if (restHighLevelClient != null) {
try {
Expand All @@ -271,4 +271,10 @@ public void shutdown() {
}
}
}

@Override
public void shutdown() {
super.shutdown();
closeFiles();
}
}

0 comments on commit f782b66

Please sign in to comment.