From 3759c6b468ddbea68e7b49e7ed27d57f00fad829 Mon Sep 17 00:00:00 2001 From: Sebastien Quioc Date: Tue, 14 Dec 2021 14:41:56 +0100 Subject: [PATCH] fix(GraylogHTTPPlugin): change the payload send to SEKOIA.IO HTTP intake --- src/main/java/com/plugin/HttpOutput.java | 29 +++++++++++++----------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/plugin/HttpOutput.java b/src/main/java/com/plugin/HttpOutput.java index 8b8149d..d687791 100644 --- a/src/main/java/com/plugin/HttpOutput.java +++ b/src/main/java/com/plugin/HttpOutput.java @@ -5,6 +5,7 @@ import java.net.URL; import java.util.List; import java.util.Map; +import java.util.HashMap; import java.util.concurrent.TimeUnit; import okhttp3.*; @@ -33,14 +34,14 @@ * interfaces. (i.e. AlarmCallback, MessageInput, MessageOutput) */ public class HttpOutput implements MessageOutput { - - public static final int HTTP_BATCH_SIZE = 500; private final OkHttpClient httpClient; private final Gson gson = new Gson(); private boolean shutdown; private boolean raise_exception_on_http_error = false; private String url; + private String intake_key; private static final String CK_OUTPUT_API = "output_api"; + private static final String CK_INTAKE_KEY = "intake_key"; private static final String CK_GZIP_REQUEST = "gzip_request"; private static final String CK_RAISE_EXCEPTION = "raise_exception"; private static final Logger LOG = LoggerFactory.getLogger(HttpOutput.class); @@ -92,11 +93,13 @@ public void writeTo(BufferedSink sink) throws IOException { public HttpOutput(@Assisted Stream stream, @Assisted Configuration conf) throws HttpOutputException { this.url = conf.getString(CK_OUTPUT_API); + this.intake_key = conf.getString(CK_INTAKE_KEY); this.raise_exception_on_http_error = conf.getBoolean(CK_RAISE_EXCEPTION); this.shutdown = false; LOG.info(" Http Output Plugin has been configured with the following parameters:"); LOG.info(CK_OUTPUT_API + " : " + this.url); + LOG.info(CK_INTAKE_KEY + " : " + this.intake_key); LOG.info(CK_GZIP_REQUEST + " : " + conf.getBoolean(CK_GZIP_REQUEST)); LOG.info(CK_RAISE_EXCEPTION + " : " + this.raise_exception_on_http_error); @@ -132,26 +135,22 @@ public void stop() { @Override public void write(List msgs) throws Exception { - for (List partition : ListUtils.partition(msgs, HTTP_BATCH_SIZE)) { - List> payload = new FastArrayList(); - for (Message msg : partition) { - payload.add(msg.getFields()); - } - this.executeRequest(RequestBody.create( - JSON, - this.gson.toJson(payload) - )); + for(Message message: msgs) { + this.write(message); } - } @Override public void write(Message msg) throws Exception { + Map payload = new HashMap<>(); + payload.put("intake_key", this.intake_key); + payload.put("json", this.gson.toJson(msg.getFields())); + this.executeRequest( RequestBody.create( JSON, - this.gson.toJson(msg.getFields()) + this.gson.toJson(payload) ) ); } @@ -204,6 +203,10 @@ public ConfigurationRequest getRequestedConfiguration() { new TextField(CK_OUTPUT_API, "API to forward the stream data.", "/", "HTTP address where the stream data to be sent.", ConfigurationField.Optional.NOT_OPTIONAL)); + configurationRequest.addField( + new TextField(CK_INTAKE_KEY, "Intake key", "", + "The intake key to identify the events", ConfigurationField.Optional.NOT_OPTIONAL)); + configurationRequest.addField(new BooleanField(CK_GZIP_REQUEST, "GZIP request", false, "Enable GZIP compression for HTTP requests."));