Skip to content

Commit

Permalink
Improve logging for failed documents in the OpenSearch sink (opensear…
Browse files Browse the repository at this point in the history
…ch-project#3387)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Sep 27, 2023
1 parent 6546901 commit 63695e9
Showing 1 changed file with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException;
import org.opensearch.dataprepper.model.failures.DlqObject;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.AbstractSink;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.plugins.dlq.DlqProvider;
import org.opensearch.dataprepper.plugins.dlq.DlqWriter;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkApiWrapper;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkApiWrapperFactory;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkOperationWriter;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingCompressedBulkRequest;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingUncompressedBulkRequest;
Expand Down Expand Up @@ -75,8 +75,6 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE;

@DataPrepperPlugin(name = "opensearch", pluginType = Sink.class)
public class OpenSearchSink extends AbstractSink<Record<Event>> {
public static final String BULKREQUEST_LATENCY = "bulkRequestLatency";
Expand Down Expand Up @@ -387,7 +385,7 @@ private void logFailureForDlqObjects(final List<DlqObject> dlqObjects, final Thr
BulkOperationWriter.dlqObjectToString(dlqObject), message));
dlqObject.releaseEventHandle(true);
} catch (final IOException e) {
LOG.error(SENSITIVE, "DLQ failure for Document[{}]", dlqObject.getFailedData(), e);
LOG.error("Failed to write a document to the DLQ", e);
dlqObject.releaseEventHandle(false);
}
});
Expand All @@ -399,13 +397,17 @@ private void logFailureForDlqObjects(final List<DlqObject> dlqObjects, final Thr
});
} catch (final IOException e) {
dlqObjects.forEach(dlqObject -> {
LOG.error(SENSITIVE, "DLQ failure for Document[{}]", dlqObject.getFailedData(), e);
LOG.error("Failed to write a document to the DLQ", e);
dlqObject.releaseEventHandle(false);
});
}
} else {
dlqObjects.forEach(dlqObject -> {
LOG.warn(SENSITIVE, "Document [{}] has failure. DLQ not configured", dlqObject.getFailedData(), failure);

final FailedDlqData failedDlqData = (FailedDlqData) dlqObject.getFailedData();

final String message = failure == null ? failedDlqData.getMessage() : failure.getMessage();
LOG.warn("Document failed to write to OpenSearch with error code {}. Configure a DLQ to save failed documents. Error: {}", failedDlqData.getStatus(), message);
dlqObject.releaseEventHandle(false);
});
}
Expand Down

0 comments on commit 63695e9

Please sign in to comment.