Skip to content

Commit

Permalink
Adding a new API to get the current transformed pipelines as a JSON (#…
Browse files Browse the repository at this point in the history
…4980)

* Adding a new API to get the current transformed pipelines as a JSON

Signed-off-by: Souvik Bose <[email protected]>

* Rename the api and address comments

Signed-off-by: Souvik Bose <[email protected]>

---------

Signed-off-by: Souvik Bose <[email protected]>
Co-authored-by: Souvik Bose <[email protected]>
  • Loading branch information
sb2k16 and sbose2k21 authored Oct 1, 2024
1 parent ccd0e24 commit db9a849
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper;

import io.micrometer.core.instrument.util.StringUtils;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.parser.PipelineTransformer;
import org.opensearch.dataprepper.peerforwarder.server.PeerForwarderServer;
Expand Down Expand Up @@ -42,6 +43,7 @@ public class DataPrepper implements PipelinesProvider {
private final PipelinesObserver pipelinesObserver;
private final Map<String, Pipeline> transformationPipelines;
private final Predicate<Map<String, Pipeline>> shouldShutdownOnPipelineFailurePredicate;
private final PipelinesDataFlowModel pipelinesDataFlowModel;

// TODO: Remove DataPrepperServer dependency on DataPrepper
@Inject
Expand All @@ -67,8 +69,9 @@ public DataPrepper(
this.pluginFactory = pluginFactory;

transformationPipelines = pipelineTransformer.transformConfiguration();
pipelinesDataFlowModel = pipelineTransformer.getPipelinesDataFlowModel();
this.shouldShutdownOnPipelineFailurePredicate = shouldShutdownOnPipelineFailurePredicate;
if (transformationPipelines.size() == 0) {
if (transformationPipelines.isEmpty()) {
throw new RuntimeException("No valid pipeline is available for execution, exiting");
}
this.peerForwarderServer = peerForwarderServer;
Expand Down Expand Up @@ -145,6 +148,10 @@ public Map<String, Pipeline> getTransformationPipelines() {
return transformationPipelines;
}

public PipelinesDataFlowModel getPipelinesDataFlowModel() {
return pipelinesDataFlowModel;
}

public void registerShutdownHandler(final DataPrepperShutdownListener shutdownListener) {
this.shutdownListeners.add(shutdownListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,4 +386,8 @@ private Buffer applyCircuitBreakerToBuffer(final Source source, final Buffer buf
.map(b -> (Buffer) b)
.orElseGet(() -> buffer);
}

public PipelinesDataFlowModel getPipelinesDataFlowModel() {
return pipelinesDataFlowModel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@

package org.opensearch.dataprepper.pipeline;

import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;

import java.util.Map;

/**
* Interface for a provider of available Data Prepper Pipelines.
*/
public interface PipelinesProvider {
Map<String, Pipeline> getTransformationPipelines();
PipelinesDataFlowModel getPipelinesDataFlowModel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class DataPrepperServer {
private static final Logger LOG = LoggerFactory.getLogger(DataPrepperServer.class);
private final HttpServerProvider serverProvider;
private final ListPipelinesHandler listPipelinesHandler;
private final GetPipelinesHandler getPipelinesHandler;
private final ShutdownHandler shutdownHandler;
private final PrometheusMeterRegistry prometheusMeterRegistry;
private final Authenticator authenticator;
Expand All @@ -41,12 +42,14 @@ public DataPrepperServer(
final HttpServerProvider serverProvider,
final ListPipelinesHandler listPipelinesHandler,
final ShutdownHandler shutdownHandler,
final GetPipelinesHandler getPipelinesHandler,
@Autowired(required = false) @Nullable final PrometheusMeterRegistry prometheusMeterRegistry,
@Autowired(required = false) @Nullable final Authenticator authenticator
) {
this.serverProvider = serverProvider;
this.listPipelinesHandler = listPipelinesHandler;
this.shutdownHandler = shutdownHandler;
this.getPipelinesHandler = getPipelinesHandler;
this.prometheusMeterRegistry = prometheusMeterRegistry;
this.authenticator = authenticator;
executorService = Executors.newFixedThreadPool(3);
Expand All @@ -67,6 +70,7 @@ private HttpServer createServer() {

createContext(server, listPipelinesHandler, authenticator, "/list");
createContext(server, shutdownHandler, authenticator, "/shutdown");
createContext(server, getPipelinesHandler, authenticator, "/pipelines");

if (prometheusMeterRegistry != null) {
final PrometheusMetricsHandler prometheusMetricsHandler = new PrometheusMetricsHandler(prometheusMeterRegistry);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.pipeline.server;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.opensearch.dataprepper.model.configuration.PipelineModel;
import org.opensearch.dataprepper.pipeline.PipelinesProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.HttpMethod;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.ArrayList;
import java.util.List;

public class GetPipelinesHandler implements HttpHandler {

private final PipelinesProvider pipelinesProvider;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final Logger LOG = LoggerFactory.getLogger(GetPipelinesHandler.class);

public GetPipelinesHandler(final PipelinesProvider pipelinesProvider) {
this.pipelinesProvider = pipelinesProvider;
}

@Override
public void handle(final HttpExchange exchange) throws IOException {
String requestMethod = exchange.getRequestMethod();
if (!requestMethod.equals(HttpMethod.GET)) {
exchange.sendResponseHeaders(HttpURLConnection.HTTP_BAD_METHOD, 0);
exchange.getResponseBody().close();
return;
}

try {
List<PipelineModel> pipelineModels = new ArrayList<>(pipelinesProvider.getPipelinesDataFlowModel().getPipelines().values());

final byte[] response = OBJECT_MAPPER.writeValueAsString(pipelineModels).getBytes();

exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=UTF-8");
exchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, response.length);
exchange.getResponseBody().write(response);
} catch (final Exception e) {
LOG.error("Caught exception listing pipelines", e);
exchange.sendResponseHeaders(HttpURLConnection.HTTP_INTERNAL_ERROR, 0);
} finally {
exchange.getResponseBody().close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.pipeline.PipelinesProvider;
import org.opensearch.dataprepper.pipeline.server.DataPrepperCoreAuthenticationProvider;
import org.opensearch.dataprepper.pipeline.server.GetPipelinesHandler;
import org.opensearch.dataprepper.pipeline.server.ListPipelinesHandler;
import org.opensearch.dataprepper.pipeline.server.ShutdownHandler;
import com.sun.net.httpserver.Authenticator;
Expand Down Expand Up @@ -74,4 +75,9 @@ public ListPipelinesHandler listPipelinesHandler(final PipelinesProvider pipelin
public ShutdownHandler shutdownHandler(final DataPrepper dataPrepper) {
return new ShutdownHandler(dataPrepper);
}

@Bean
public GetPipelinesHandler GetPipelinesHandler(final PipelinesProvider pipelinesProvider) {
return new GetPipelinesHandler(pipelinesProvider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public class DataPrepperServerTest {
@Mock
private ShutdownHandler shutdownHandler;

@Mock
private GetPipelinesHandler getPipelinesHandler;

@Mock
private PrometheusMeterRegistry prometheusMeterRegistry;

Expand Down Expand Up @@ -82,7 +85,7 @@ public void testGivenValidServerWhenStartThenShouldCallServerStart() {
verifyServerStart();
verify(server).createContext(eq("/metrics/prometheus"), any(PrometheusMetricsHandler.class));
verify(server).createContext(eq("/metrics/sys"), any(PrometheusMetricsHandler.class));
verify(context, times(4)).setAuthenticator(eq(authenticator));
verify(context, times(5)).setAuthenticator(eq(authenticator));
}

@Test
Expand All @@ -93,7 +96,7 @@ public void testGivenValidServerWhenStartThenShouldCallServerStart_NullPrometheu
dataPrepperServer.start();

verifyServerStart();
verify(context, times(2)).setAuthenticator(eq(authenticator));
verify(context, times(3)).setAuthenticator(eq(authenticator));
}

@Test
Expand Down Expand Up @@ -145,6 +148,7 @@ private void verifyServerStart() {
verify(httpServerProvider).get();
verify(server).createContext("/list", listPipelinesHandler);
verify(server).createContext(eq("/shutdown"), eq(shutdownHandler));
verify(server).createContext(eq("/pipelines"), eq(getPipelinesHandler));
final ArgumentCaptor<ExecutorService> executorServiceArgumentCaptor = ArgumentCaptor.forClass(ExecutorService.class);
verify(server).setExecutor(executorServiceArgumentCaptor.capture());
final ExecutorService actualExecutorService = executorServiceArgumentCaptor.getValue();
Expand All @@ -158,6 +162,6 @@ private void verifyServerStart() {
}

private DataPrepperServer createObjectUnderTest(final PrometheusMeterRegistry prometheusMeterRegistry, final Authenticator authenticator) {
return new DataPrepperServer(httpServerProvider, listPipelinesHandler, shutdownHandler, prometheusMeterRegistry, authenticator);
return new DataPrepperServer(httpServerProvider, listPipelinesHandler, shutdownHandler, getPipelinesHandler, prometheusMeterRegistry, authenticator);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.pipeline.server;

import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.configuration.DataPrepperVersion;
import org.opensearch.dataprepper.model.configuration.PipelineModel;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.SinkModel;
import org.opensearch.dataprepper.pipeline.PipelinesProvider;

import javax.ws.rs.HttpMethod;
import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class GetPipelinesHandlerTest {
@Mock
private PipelinesProvider pipelinesProvider;
@Mock
private HttpExchange httpExchange;

@Mock
private OutputStream outputStream;

@BeforeEach
public void beforeEach() {
when(httpExchange.getResponseBody())
.thenReturn(outputStream);
}

private GetPipelinesHandler createObjectUnderTest() {
return new GetPipelinesHandler(pipelinesProvider);
}

@ParameterizedTest
@ValueSource(strings = { HttpMethod.GET })
public void testGivenPipelinesThenResponseWritten(String httpMethod) throws IOException {
final String pipelineName = "test-pipeline";
final Headers headers = mock(Headers.class);
doNothing().when(headers).add(anyString(), anyString());
final DataPrepperVersion version = DataPrepperVersion.parse("2.0");
final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null);
final List<PluginModel> processors = Collections.singletonList(new PluginModel("testProcessor", (Map<String, Object>) null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null));
final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50);

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(version, Collections.singletonMap(pipelineName, pipelineModel));

when(pipelinesProvider.getPipelinesDataFlowModel())
.thenReturn(pipelinesDataFlowModel);
when(httpExchange.getResponseHeaders())
.thenReturn(headers);
when(httpExchange.getRequestMethod())
.thenReturn(httpMethod);

final GetPipelinesHandler handler = createObjectUnderTest();

handler.handle(httpExchange);

verify(headers)
.add(eq("Content-Type"), eq("text/plain; charset=UTF-8"));
verify(httpExchange)
.sendResponseHeaders(eq(HttpURLConnection.HTTP_OK), anyLong());
verify(outputStream)
.write(any(byte[].class));
verify(outputStream)
.close();
}

@ParameterizedTest
@ValueSource(strings = { HttpMethod.DELETE, HttpMethod.PATCH, HttpMethod.PUT, HttpMethod.POST })
public void testGivenProhibitedHttpMethodThenErrorResponseWritten(String httpMethod) throws IOException {
final GetPipelinesHandler handler = createObjectUnderTest();

when(httpExchange.getRequestMethod())
.thenReturn(httpMethod);

handler.handle(httpExchange);

verify(httpExchange)
.sendResponseHeaders(eq(HttpURLConnection.HTTP_BAD_METHOD), eq(0L));
verify(outputStream)
.close();
}

@ParameterizedTest
@ValueSource(strings = { HttpMethod.GET })
public void testGivenExceptionThrownThenErrorResponseWritten(String httpMethod) throws IOException {
when(httpExchange.getRequestMethod())
.thenReturn(httpMethod);

pipelinesProvider = null;
final GetPipelinesHandler handler = createObjectUnderTest();
handler.handle(httpExchange);

verify(httpExchange)
.sendResponseHeaders(eq(HttpURLConnection.HTTP_INTERNAL_ERROR), eq(0L));
verify(outputStream)
.close();
}
}

0 comments on commit db9a849

Please sign in to comment.