Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into saas-connector-sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
san81 authored Oct 1, 2024
2 parents 1b74575 + db9a849 commit 7d0ff17
Show file tree
Hide file tree
Showing 90 changed files with 3,655 additions and 609 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
public @interface DataPrepperPlugin {
String DEFAULT_DEPRECATED_NAME = "";

String DEFAULT_ALTERNATE_NAME = "";

/**
*
* @return Name of the plugin which should be unique for the type
Expand All @@ -46,6 +48,12 @@
*/
String deprecatedName() default DEFAULT_DEPRECATED_NAME;

/**
*
* @return Alternate name of the plugin which should be unique for the type
*/
String[] alternateNames() default {};

/**
* The class type for this plugin.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper;

import io.micrometer.core.instrument.util.StringUtils;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.parser.PipelineTransformer;
import org.opensearch.dataprepper.peerforwarder.server.PeerForwarderServer;
Expand Down Expand Up @@ -42,6 +43,7 @@ public class DataPrepper implements PipelinesProvider {
private final PipelinesObserver pipelinesObserver;
private final Map<String, Pipeline> transformationPipelines;
private final Predicate<Map<String, Pipeline>> shouldShutdownOnPipelineFailurePredicate;
private final PipelinesDataFlowModel pipelinesDataFlowModel;

// TODO: Remove DataPrepperServer dependency on DataPrepper
@Inject
Expand All @@ -67,8 +69,9 @@ public DataPrepper(
this.pluginFactory = pluginFactory;

transformationPipelines = pipelineTransformer.transformConfiguration();
pipelinesDataFlowModel = pipelineTransformer.getPipelinesDataFlowModel();
this.shouldShutdownOnPipelineFailurePredicate = shouldShutdownOnPipelineFailurePredicate;
if (transformationPipelines.size() == 0) {
if (transformationPipelines.isEmpty()) {
throw new RuntimeException("No valid pipeline is available for execution, exiting");
}
this.peerForwarderServer = peerForwarderServer;
Expand Down Expand Up @@ -97,17 +100,21 @@ public void shutdown() {
shutdownServers();
}

private void shutdownPipelines() {
shutdownPipelines(DataPrepperShutdownOptions.defaultOptions());
}

/**
* Triggers the shutdown of all configured valid pipelines.
*/
public void shutdownPipelines() {
public void shutdownPipelines(final DataPrepperShutdownOptions shutdownOptions) {
transformationPipelines.forEach((name, pipeline) -> {
pipeline.removeShutdownObserver(pipelinesObserver);
});

for (final Pipeline pipeline : transformationPipelines.values()) {
LOG.info("Shutting down pipeline: {}", pipeline.getName());
pipeline.shutdown();
pipeline.shutdown(shutdownOptions);
}
}

Expand All @@ -127,11 +134,12 @@ public void shutdownServers() {
*
* @param pipeline name of the pipeline
*/
public void shutdownPipelines(final String pipeline) {
public void shutdownPipeline(final String pipeline) {
if (transformationPipelines.containsKey(pipeline)) {
transformationPipelines.get(pipeline).shutdown();
}
}

public PluginFactory getPluginFactory() {
return pluginFactory;
}
Expand All @@ -140,6 +148,10 @@ public Map<String, Pipeline> getTransformationPipelines() {
return transformationPipelines;
}

public PipelinesDataFlowModel getPipelinesDataFlowModel() {
return pipelinesDataFlowModel;
}

public void registerShutdownHandler(final DataPrepperShutdownListener shutdownListener) {
this.shutdownListeners.add(shutdownListener);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper;

import java.time.Duration;

public class DataPrepperShutdownOptions {
private final Duration bufferReadTimeout;
private final Duration bufferDrainTimeout;

public static DataPrepperShutdownOptions defaultOptions() {
return new DataPrepperShutdownOptions(builder());
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private Duration bufferReadTimeout;
private Duration bufferDrainTimeout;

private Builder() {
}

public Builder withBufferReadTimeout(final Duration bufferReadTimeout) {
this.bufferReadTimeout = bufferReadTimeout;
return this;
}

public Builder withBufferDrainTimeout(final Duration bufferDrainTimeout) {
this.bufferDrainTimeout = bufferDrainTimeout;
return this;
}

public DataPrepperShutdownOptions build() {
return new DataPrepperShutdownOptions(this);
}
}

private DataPrepperShutdownOptions(final Builder builder) {
this.bufferReadTimeout = builder.bufferReadTimeout;
this.bufferDrainTimeout = builder.bufferDrainTimeout;

if(bufferReadTimeout != null && bufferDrainTimeout != null) {
if (bufferReadTimeout.compareTo(bufferDrainTimeout) > 0) {
throw new IllegalArgumentException("Buffer read timeout cannot be greater than buffer drain timeout");
}
}
}

public Duration getBufferReadTimeout() {
return bufferReadTimeout;
}

public Duration getBufferDrainTimeout() {
return bufferDrainTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -387,4 +387,8 @@ private Buffer applyCircuitBreakerToBuffer(final Source source, final Buffer buf
.map(b -> (Buffer) b)
.orElseGet(() -> buffer);
}

public PipelinesDataFlowModel getPipelinesDataFlowModel() {
return pipelinesDataFlowModel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.pipeline;

import com.google.common.base.Preconditions;
import org.opensearch.dataprepper.DataPrepperShutdownOptions;
import org.opensearch.dataprepper.acknowledgements.InactiveAcknowledgementSetManager;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
Expand Down Expand Up @@ -41,7 +42,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -55,7 +55,7 @@
public class Pipeline {
private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
private static final int SINK_LOGGING_FREQUENCY = (int) Duration.ofSeconds(60).toMillis();
private volatile AtomicBoolean stopRequested;
private final PipelineShutdown pipelineShutdown;

private final String name;
private final Source source;
Expand Down Expand Up @@ -137,7 +137,7 @@ public Pipeline(
this.sinkExecutorService = PipelineThreadPoolExecutor.newFixedThreadPool(processorThreads,
new PipelineThreadFactory(format("%s-sink-worker", name)), this);

stopRequested = new AtomicBoolean(false);
this.pipelineShutdown = new PipelineShutdown(name, buffer);
}

AcknowledgementSetManager getAcknowledgementSetManager() {
Expand Down Expand Up @@ -176,7 +176,11 @@ public Collection<Sink> getSinks() {
}

public boolean isStopRequested() {
return stopRequested.get();
return pipelineShutdown.isStopRequested();
}

public boolean isForceStopReadingBuffers() {
return pipelineShutdown.isForceStopReadingBuffers();
}

public Duration getPeerForwarderDrainTimeout() {
Expand Down Expand Up @@ -267,6 +271,10 @@ public void execute() {
}
}

public synchronized void shutdown() {
shutdown(DataPrepperShutdownOptions.defaultOptions());
}

/**
* Initiates shutdown of the pipeline by:
* 1. Stopping the source to prevent new items from being consumed
Expand All @@ -276,19 +284,20 @@ public void execute() {
* 5. Shutting down processors and sinks
* 6. Stopping the sink ExecutorService
*/
public synchronized void shutdown() {
public synchronized void shutdown(final DataPrepperShutdownOptions dataPrepperShutdownOptions) {
LOG.info("Pipeline [{}] - Received shutdown signal with buffer drain timeout {}, processor shutdown timeout {}, " +
"and sink shutdown timeout {}. Initiating the shutdown process",
name, buffer.getDrainTimeout(), processorShutdownTimeout, sinkShutdownTimeout);
try {
source.stop();
stopRequested.set(true);
} catch (Exception ex) {
LOG.error("Pipeline [{}] - Encountered exception while stopping the source, " +
"proceeding with termination of process workers", name, ex);
}

shutdownExecutorService(processorExecutorService, buffer.getDrainTimeout().toMillis() + processorShutdownTimeout.toMillis(), "processor");
pipelineShutdown.shutdown(dataPrepperShutdownOptions);

shutdownExecutorService(processorExecutorService, pipelineShutdown.getBufferDrainTimeout().plus(processorShutdownTimeout), "processor");

processorSets.forEach(processorSet -> processorSet.forEach(Processor::shutdown));
buffer.shutdown();
Expand All @@ -297,7 +306,7 @@ public synchronized void shutdown() {
.map(DataFlowComponent::getComponent)
.forEach(Sink::shutdown);

shutdownExecutorService(sinkExecutorService, sinkShutdownTimeout.toMillis(), "sink");
shutdownExecutorService(sinkExecutorService, sinkShutdownTimeout, "sink");

LOG.info("Pipeline [{}] - Pipeline fully shutdown.", name);

Expand All @@ -312,13 +321,13 @@ public void removeShutdownObserver(final PipelineObserver pipelineObserver) {
observers.remove(pipelineObserver);
}

private void shutdownExecutorService(final ExecutorService executorService, final long timeoutForTerminationInMillis, final String workerName) {
private void shutdownExecutorService(final ExecutorService executorService, final Duration timeoutForTermination, final String workerName) {
LOG.info("Pipeline [{}] - Shutting down {} process workers.", name, workerName);

executorService.shutdown();
try {
if (!executorService.awaitTermination(timeoutForTerminationInMillis, TimeUnit.MILLISECONDS)) {
LOG.warn("Pipeline [{}] - Workers did not terminate in time, forcing termination of {} workers.", name, workerName);
if (!executorService.awaitTermination(timeoutForTermination.toMillis(), TimeUnit.MILLISECONDS)) {
LOG.warn("Pipeline [{}] - Workers did not terminate in {}, forcing termination of {} workers.", name, timeoutForTermination, workerName);
executorService.shutdownNow();
}
} catch (InterruptedException ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.pipeline;

import org.opensearch.dataprepper.DataPrepperShutdownOptions;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

class PipelineShutdown {
private static final Logger LOG = LoggerFactory.getLogger(PipelineShutdown.class);

private final AtomicBoolean stopRequested = new AtomicBoolean(false);
private final Duration pipelineConfiguredBufferDrainTimeout;
private final Clock clock;
private final String pipelineName;
private Instant shutdownRequestedAt;
private Instant forceStopReadingBuffersAt;
private Duration bufferDrainTimeoutOverride;
private Duration bufferDrainTimeout;

PipelineShutdown(final String pipelineName, final Buffer<?> buffer) {
this(pipelineName, buffer, Clock.systemDefaultZone());
}

PipelineShutdown(String pipelineName, final Buffer<?> buffer, final Clock clock) {
this.pipelineName = pipelineName;
pipelineConfiguredBufferDrainTimeout = Objects.requireNonNull(buffer.getDrainTimeout());
bufferDrainTimeout = pipelineConfiguredBufferDrainTimeout;
this.clock = clock;
}

public void shutdown(final DataPrepperShutdownOptions dataPrepperShutdownOptions) {
final boolean stopPreviouslyRequested = stopRequested.get();
if(stopPreviouslyRequested) {
return;
}

stopRequested.set(true);
shutdownRequestedAt = now();

final Duration bufferReadTimeout = dataPrepperShutdownOptions.getBufferReadTimeout();
if(bufferReadTimeout != null) {
forceStopReadingBuffersAt = shutdownRequestedAt.plus(bufferReadTimeout);
}

final Duration bufferDrainTimeoutOverride = dataPrepperShutdownOptions.getBufferDrainTimeout();
if(bufferDrainTimeoutOverride != null) {
this.bufferDrainTimeoutOverride = bufferDrainTimeoutOverride;
bufferDrainTimeout = bufferDrainTimeoutOverride;
}

LOG.info("Started shutdown for pipeline {}. Requested at {}. Force stop reading buffers at {}. The buffer drain timeout to use is {}",
pipelineName, shutdownRequestedAt, forceStopReadingBuffersAt, bufferDrainTimeout);
}

boolean isStopRequested() {
return stopRequested.get();
}

boolean isForceStopReadingBuffers() {
return forceStopReadingBuffersAt != null && now().isAfter(forceStopReadingBuffersAt);
}

public Duration getBufferDrainTimeout() {
return bufferDrainTimeout;
}

private Instant now() {
return Instant.ofEpochMilli(clock.millis());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@

package org.opensearch.dataprepper.pipeline;

import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;

import java.util.Map;

/**
* Interface for a provider of available Data Prepper Pipelines.
*/
public interface PipelinesProvider {
Map<String, Pipeline> getTransformationPipelines();
PipelinesDataFlowModel getPipelinesDataFlowModel();
}
Loading

0 comments on commit 7d0ff17

Please sign in to comment.