Skip to content

Commit

Permalink
fix(GraylogHTTPPlugin): change the payload send to SEKOIA.IO HTTP intake
Browse files Browse the repository at this point in the history
  • Loading branch information
squioc committed Dec 14, 2021
1 parent ef8f8dc commit 3759c6b
Showing 1 changed file with 16 additions and 13 deletions.
29 changes: 16 additions & 13 deletions src/main/java/com/plugin/HttpOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;

import okhttp3.*;
Expand Down Expand Up @@ -33,14 +34,14 @@
* interfaces. (i.e. AlarmCallback, MessageInput, MessageOutput)
*/
public class HttpOutput implements MessageOutput {

public static final int HTTP_BATCH_SIZE = 500;
private final OkHttpClient httpClient;
private final Gson gson = new Gson();
private boolean shutdown;
private boolean raise_exception_on_http_error = false;
private String url;
private String intake_key;
private static final String CK_OUTPUT_API = "output_api";
private static final String CK_INTAKE_KEY = "intake_key";
private static final String CK_GZIP_REQUEST = "gzip_request";
private static final String CK_RAISE_EXCEPTION = "raise_exception";
private static final Logger LOG = LoggerFactory.getLogger(HttpOutput.class);
Expand Down Expand Up @@ -92,11 +93,13 @@ public void writeTo(BufferedSink sink) throws IOException {
public HttpOutput(@Assisted Stream stream, @Assisted Configuration conf) throws HttpOutputException {

this.url = conf.getString(CK_OUTPUT_API);
this.intake_key = conf.getString(CK_INTAKE_KEY);
this.raise_exception_on_http_error = conf.getBoolean(CK_RAISE_EXCEPTION);

this.shutdown = false;
LOG.info(" Http Output Plugin has been configured with the following parameters:");
LOG.info(CK_OUTPUT_API + " : " + this.url);
LOG.info(CK_INTAKE_KEY + " : " + this.intake_key);
LOG.info(CK_GZIP_REQUEST + " : " + conf.getBoolean(CK_GZIP_REQUEST));
LOG.info(CK_RAISE_EXCEPTION + " : " + this.raise_exception_on_http_error);

Expand Down Expand Up @@ -132,26 +135,22 @@ public void stop() {
@Override
public void write(List<Message> msgs) throws Exception {

for (List<Message> partition : ListUtils.partition(msgs, HTTP_BATCH_SIZE)) {
List<Map<String, Object>> payload = new FastArrayList();
for (Message msg : partition) {
payload.add(msg.getFields());
}
this.executeRequest(RequestBody.create(
JSON,
this.gson.toJson(payload)
));
for(Message message: msgs) {
this.write(message);
}

}

@Override
public void write(Message msg) throws Exception {

Map<String, Object> payload = new HashMap<>();
payload.put("intake_key", this.intake_key);
payload.put("json", this.gson.toJson(msg.getFields()));

this.executeRequest(
RequestBody.create(
JSON,
this.gson.toJson(msg.getFields())
this.gson.toJson(payload)
)
);
}
Expand Down Expand Up @@ -204,6 +203,10 @@ public ConfigurationRequest getRequestedConfiguration() {
new TextField(CK_OUTPUT_API, "API to forward the stream data.", "/",
"HTTP address where the stream data to be sent.", ConfigurationField.Optional.NOT_OPTIONAL));

configurationRequest.addField(
new TextField(CK_INTAKE_KEY, "Intake key", "",
"The intake key to identify the events", ConfigurationField.Optional.NOT_OPTIONAL));

configurationRequest.addField(new BooleanField(CK_GZIP_REQUEST, "GZIP request", false,
"Enable GZIP compression for HTTP requests."));

Expand Down

0 comments on commit 3759c6b

Please sign in to comment.