Skip to content

Commit

Permalink
Refactoring http source functionality and address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <[email protected]>
  • Loading branch information
sb2k16 committed Jul 22, 2024
1 parent 72aec63 commit 0cd5155
Show file tree
Hide file tree
Showing 12 changed files with 778 additions and 471 deletions.
3 changes: 3 additions & 0 deletions data-prepper-plugins/http-source-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ plugins {

dependencies {
implementation project(':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:armeria-common')
implementation project(':data-prepper-plugins:blocking-buffer')
implementation libs.armeria.core
implementation libs.commons.io
implementation 'software.amazon.awssdk:acm'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:apache-client'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package org.opensearch.dataprepper.http;

public interface BaseHttpService {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package org.opensearch.dataprepper.http;

import com.linecorp.armeria.server.HttpService;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.encoding.DecodingService;
import com.linecorp.armeria.server.healthcheck.HealthCheckService;
import com.linecorp.armeria.server.throttling.ThrottlingService;
import org.opensearch.dataprepper.HttpRequestExceptionHandler;
import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider;
import org.opensearch.dataprepper.http.certificate.CertificateProviderFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.codec.JsonDecoder;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.model.Certificate;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.slf4j.Logger;

import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.Function;

/**
* BaseHttpSource class holds the common http related source functionality including starting the armeria server and authentication handling.
* HTTP based sources should use this functionality when implementing the respective source.
*/
public abstract class BaseHttpSource<T extends Record<?>> implements Source<T> {
public static final String REGEX_HEALTH = "regex:^/(?!health$).*$";
public static final String SERVER_CONNECTIONS = "serverConnections";
private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}";
private static final String HTTP_HEALTH_CHECK_PATH = "/health";
private final HttpServerConfig sourceConfig;
private final CertificateProviderFactory certificateProviderFactory;
private final ArmeriaHttpAuthenticationProvider authenticationProvider;
private final HttpRequestExceptionHandler httpRequestExceptionHandler;
private final String pipelineName;
private final String sourceName;
private final Logger logger;
private final PluginMetrics pluginMetrics;
private Server server;
private ByteDecoder byteDecoder;

public BaseHttpSource(final HttpServerConfig sourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory,
final PipelineDescription pipelineDescription, final String sourceName, final Logger logger) {
this.sourceConfig = sourceConfig;
this.pluginMetrics = pluginMetrics;
this.pipelineName = pipelineDescription.getPipelineName();
this.sourceName = sourceName;
this.logger = logger;
this.byteDecoder = new JsonDecoder();
this.certificateProviderFactory = new CertificateProviderFactory(sourceConfig);
final PluginModel authenticationConfiguration = sourceConfig.getAuthentication();
final PluginSetting authenticationPluginSetting;

if (authenticationConfiguration == null || authenticationConfiguration.getPluginName().equals(ArmeriaHttpAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME)) {
logger.warn("Creating {} source without authentication. This is not secure.", sourceName);
logger.warn("In order to set up Http Basic authentication for the {} source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#authentication-configurations", sourceName);
}

if (authenticationConfiguration != null) {
authenticationPluginSetting =
new PluginSetting(authenticationConfiguration.getPluginName(), authenticationConfiguration.getPluginSettings());
} else {
authenticationPluginSetting =
new PluginSetting(ArmeriaHttpAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME, Collections.emptyMap());
}
authenticationPluginSetting.setPipelineName(pipelineName);
authenticationProvider = pluginFactory.loadPlugin(ArmeriaHttpAuthenticationProvider.class, authenticationPluginSetting);
httpRequestExceptionHandler = new HttpRequestExceptionHandler(pluginMetrics);
}

@Override
public void start(final Buffer<T> buffer) {
if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
}
if (server == null) {
final ServerBuilder sb = Server.builder();

sb.disableServerHeader();

if (sourceConfig.isSsl()) {
logger.info("Creating {} source with SSL/TLS enabled.", sourceName);
final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider();
final Certificate certificate = certificateProvider.getCertificate();
// TODO: enable encrypted key with password
sb.https(sourceConfig.getPort()).tls(
new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)),
new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8)
)
);
} else {
logger.warn("Creating {} source without SSL/TLS. This is not secure.", sourceName);
logger.warn("In order to set up TLS for the {} source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#ssl", sourceName);
sb.http(sourceConfig.getPort());
}

if (sourceConfig.getAuthentication() != null) {
final Optional<Function<? super HttpService, ? extends HttpService>> optionalAuthDecorator = authenticationProvider.getAuthenticationDecorator();

if (sourceConfig.isUnauthenticatedHealthCheck()) {
optionalAuthDecorator.ifPresent(authDecorator -> sb.decorator(REGEX_HEALTH, authDecorator));
} else {
optionalAuthDecorator.ifPresent(sb::decorator);
}
}

sb.maxNumConnections(sourceConfig.getMaxConnectionCount());
sb.requestTimeout(Duration.ofMillis(sourceConfig.getRequestTimeoutInMillis()));
if (sourceConfig.getMaxRequestLength() != null) {
sb.maxRequestLength(sourceConfig.getMaxRequestLength().getBytes());
}
final int threads = sourceConfig.getThreadCount();
final ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor(threads);
sb.blockingTaskExecutor(blockingTaskExecutor, true);
final int maxPendingRequests = sourceConfig.getMaxPendingRequests();
final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy(
maxPendingRequests, blockingTaskExecutor.getQueue());
final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics);

final String httpSourcePath = sourceConfig.getPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName);
sb.decorator(httpSourcePath, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler));
final BaseHttpService httpService = getHttpService(sourceConfig.getBufferTimeoutInMillis(), buffer, pluginMetrics);

if (CompressionOption.NONE.equals(sourceConfig.getCompression())) {
sb.annotatedService(httpSourcePath, httpService, httpRequestExceptionHandler);
} else {
sb.annotatedService(httpSourcePath, httpService, DecodingService.newDecorator(), httpRequestExceptionHandler);
}

if (sourceConfig.hasHealthCheckService()) {
logger.info("{} source health check is enabled", sourceName);
sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build());
}

server = sb.build();
pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections);
}

try {
server.start().get();
} catch (ExecutionException ex) {
if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) {
throw (RuntimeException) ex.getCause();
} else {
throw new RuntimeException(ex);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new RuntimeException(ex);
}
logger.info("Started {} source on port {}", sourceName, sourceConfig.getPort());
}

@Override
public ByteDecoder getDecoder() {
return byteDecoder;
}

@Override
public void stop() {
if (server != null) {
try {
server.stop().get();
} catch (ExecutionException ex) {
if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) {
throw (RuntimeException) ex.getCause();
} else {
throw new RuntimeException(ex);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new RuntimeException(ex);
}
}
logger.info("Stopped {} source.", sourceName);
}

public abstract BaseHttpService getHttpService(int bufferTimeoutInMillis, Buffer<T> buffer, PluginMetrics pluginMetrics);

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,37 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linecorp.armeria.common.HttpData;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

public class MultiLineJsonCodec implements Codec<List<Map<String, Object>>> {
public class MultiLineJsonCodec implements Codec<List<Map<String, Object>>> {
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String REGEX = "\\r?\\n";
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE =
new TypeReference<Map<String, Object>>() {};
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<>() {
};
private static final Pattern multiLineJsonSplitPattern = Pattern.compile(REGEX);

private static boolean isInvalidLine(final String str) {
return str == null || str.isEmpty() || str.isBlank();
}

@Override
public List<Map<String, Object>> parse(HttpData httpData) throws IOException {
List<Map<String, Object>> jsonListData = new ArrayList<>();

String requestBody = new String(httpData.toInputStream().readAllBytes(), StandardCharsets.UTF_8);
List<String> jsonLines = Arrays.asList(requestBody.split(REGEX));
String[] jsonLines = multiLineJsonSplitPattern.split(requestBody);

for (String jsonLine: jsonLines) {
for (String jsonLine : jsonLines) {
if (isInvalidLine(jsonLine)) {
throw new IOException("Error processing request payload.");
}
jsonListData.add(objectMapper.readValue(jsonLine, MAP_TYPE_REFERENCE));
}
return jsonListData;
}

private static boolean isInvalidLine(final String str) {
return str == null || str.isEmpty() || str.isBlank();
}
}
Loading

0 comments on commit 0cd5155

Please sign in to comment.