Skip to content

Commit

Permalink
Merge pull request #4 from SekoiaLab/fix/HTTPOutputCompatibilityWithH…
Browse files Browse the repository at this point in the history
…TTPIntake

GraylogHttpOutput: fix the plugin to be able to push on SEKOIA.Io HTTP Intake
  • Loading branch information
otetard authored Dec 14, 2021
2 parents 0ec08b5 + 3d5d2e3 commit e5fcdda
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 63 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<groupId>com.plugin</groupId>
<artifactId>graylog-plugin-http-output</artifactId>
<version>1.0.3-SNAPSHOT</version>
<version>1.0.4-SNAPSHOT</version>
<packaging>jar</packaging>

<name>${project.artifactId}</name>
Expand Down
77 changes: 15 additions & 62 deletions src/main/java/com/plugin/HttpOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;

import okhttp3.*;
Expand Down Expand Up @@ -33,71 +34,29 @@
* interfaces. (i.e. AlarmCallback, MessageInput, MessageOutput)
*/
public class HttpOutput implements MessageOutput {

public static final int HTTP_BATCH_SIZE = 500;
private final OkHttpClient httpClient;
private final Gson gson = new Gson();
private boolean shutdown;
private boolean raise_exception_on_http_error = false;
private String url;
private String intake_key;
private static final String CK_OUTPUT_API = "output_api";
private static final String CK_GZIP_REQUEST = "gzip_request";
private static final String CK_INTAKE_KEY = "intake_key";
private static final String CK_RAISE_EXCEPTION = "raise_exception";
private static final Logger LOG = LoggerFactory.getLogger(HttpOutput.class);
private static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");

/**
* This interceptor compresses the HTTP request body. Many webservers can't handle this!
* <p>
* taken from https://square.github.io/okhttp/interceptors/
*/
final class GzipRequestInterceptor implements Interceptor {
@Override
public Response intercept(Interceptor.Chain chain) throws IOException {
Request originalRequest = chain.request();
if (originalRequest.body() == null || originalRequest.header("Content-Encoding") != null) {
return chain.proceed(originalRequest);
}

Request compressedRequest = originalRequest.newBuilder()
.header("Content-Encoding", "gzip")
.method(originalRequest.method(), gzip(originalRequest.body()))
.build();
return chain.proceed(compressedRequest);
}

private RequestBody gzip(final RequestBody body) {
return new RequestBody() {
@Override
public MediaType contentType() {
return body.contentType();
}

@Override
public long contentLength() {
return -1; // We don't know the compressed length in advance!
}

@Override
public void writeTo(BufferedSink sink) throws IOException {
BufferedSink gzipSink = Okio.buffer(new GzipSink(sink));
body.writeTo(gzipSink);
gzipSink.close();
}
};
}
}

@Inject
public HttpOutput(@Assisted Stream stream, @Assisted Configuration conf) throws HttpOutputException {

this.url = conf.getString(CK_OUTPUT_API);
this.intake_key = conf.getString(CK_INTAKE_KEY);
this.raise_exception_on_http_error = conf.getBoolean(CK_RAISE_EXCEPTION);

this.shutdown = false;
LOG.info(" Http Output Plugin has been configured with the following parameters:");
LOG.info(CK_OUTPUT_API + " : " + this.url);
LOG.info(CK_GZIP_REQUEST + " : " + conf.getBoolean(CK_GZIP_REQUEST));
LOG.info(CK_INTAKE_KEY + " : " + this.intake_key);
LOG.info(CK_RAISE_EXCEPTION + " : " + this.raise_exception_on_http_error);

try {
Expand All @@ -112,9 +71,6 @@ public HttpOutput(@Assisted Stream stream, @Assisted Configuration conf) throws
.writeTimeout(10, TimeUnit.SECONDS)
.readTimeout(5, TimeUnit.SECONDS);

if (conf.getBoolean(CK_GZIP_REQUEST)) {
clientBuilder.addInterceptor(new GzipRequestInterceptor());
}
this.httpClient = clientBuilder.build();
}

Expand All @@ -132,26 +88,22 @@ public void stop() {
@Override
public void write(List<Message> msgs) throws Exception {

for (List<Message> partition : ListUtils.partition(msgs, HTTP_BATCH_SIZE)) {
List<Map<String, Object>> payload = new FastArrayList();
for (Message msg : partition) {
payload.add(msg.getFields());
}
this.executeRequest(RequestBody.create(
JSON,
this.gson.toJson(payload)
));
for(Message message: msgs) {
this.write(message);
}

}

@Override
public void write(Message msg) throws Exception {

Map<String, Object> payload = new HashMap<>();
payload.put("intake_key", this.intake_key);
payload.put("json", this.gson.toJson(msg.getFields()));

this.executeRequest(
RequestBody.create(
JSON,
this.gson.toJson(msg.getFields())
this.gson.toJson(payload)
)
);
}
Expand Down Expand Up @@ -204,8 +156,9 @@ public ConfigurationRequest getRequestedConfiguration() {
new TextField(CK_OUTPUT_API, "API to forward the stream data.", "/",
"HTTP address where the stream data to be sent.", ConfigurationField.Optional.NOT_OPTIONAL));

configurationRequest.addField(new BooleanField(CK_GZIP_REQUEST, "GZIP request", false,
"Enable GZIP compression for HTTP requests."));
configurationRequest.addField(
new TextField(CK_INTAKE_KEY, "Intake key", "",
"The intake key to identify the events", ConfigurationField.Optional.NOT_OPTIONAL));

configurationRequest.addField(new BooleanField(CK_RAISE_EXCEPTION, "Raise exception", false,
"Raise an exception on HTTP error"));
Expand Down

0 comments on commit e5fcdda

Please sign in to comment.