From f782b669f5675192cc56e47fca0bf24d00eabf7d Mon Sep 17 00:00:00 2001 From: kkondaka <41027584+kkondaka@users.noreply.github.com> Date: Thu, 9 Feb 2023 09:15:19 -0800 Subject: [PATCH] OpenSearchSink should close open files before retrying initialization (#2255) Signed-off-by: Krishna Kondaka Co-authored-by: Krishna Kondaka --- .../java/org/opensearch/dataprepper/DataPrepper.java | 3 ++- .../plugins/sink/opensearch/OpenSearchSink.java | 12 +++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java index 49fb6624c2..a8e38c3eb0 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java @@ -94,7 +94,8 @@ public boolean execute() { } } if (waitingPipelineNames.size() > 0) { - LOG.info("One or more Pipelines are not ready even after {} retries.", numRetries); + LOG.info("One or more Pipelines are not ready even after {} retries. Shutting down pipelines", numRetries); + shutdown(); throw new RuntimeException("Failed to start pipelines"); } transformationPipelines.forEach((name, pipeline) -> { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 0654a5470f..b930011377 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -107,6 +107,7 @@ public void doInitialize() { doInitializeInternal(); } catch (IOException e) { LOG.warn("Failed to initialize OpenSearch sink, retrying. Error - " + e.getCause()); + closeFiles(); } catch (InvalidPluginConfigurationException e) { LOG.error("Failed to initialize OpenSearch sink."); this.shutdown(); @@ -118,6 +119,7 @@ public void doInitialize() { throw e; } LOG.warn("Failed to initialize OpenSearch sink, retrying. Error - " + e.getCause()); + closeFiles(); } } @@ -252,9 +254,7 @@ private void logFailure(final BulkOperation bulkOperation, final Throwable failu } } - @Override - public void shutdown() { - super.shutdown(); + private void closeFiles() { // Close the client. This closes the low-level client which will close it for both high-level clients. if (restHighLevelClient != null) { try { @@ -271,4 +271,10 @@ public void shutdown() { } } } + + @Override + public void shutdown() { + super.shutdown(); + closeFiles(); + } }