Skip to content

Commit

Permalink
in_splunk: add support for HTTP/1.1 chunked transfer encoding
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Oct 4, 2024
1 parent c2d51a8 commit 9d1105d
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 5 deletions.
7 changes: 6 additions & 1 deletion plugins/in_splunk/splunk_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ static void splunk_conn_request_init(struct mk_http_session *session,

static int splunk_conn_event(void *data)
{
int ret;
int status;
size_t size;
ssize_t available;
Expand Down Expand Up @@ -95,7 +96,11 @@ static int splunk_conn_event(void *data)

if (status == MK_HTTP_PARSER_OK) {
/* Do more logic parsing and checks for this request */
splunk_prot_handle(ctx, conn, &conn->session, &conn->request);
ret = splunk_prot_handle(ctx, conn, &conn->session, &conn->request);
if (ret == -1) {
splunk_conn_del(conn);
return -1;
}

/* Evict the processed request from the connection buffer and reinitialize
* the HTTP parser.
Expand Down
62 changes: 58 additions & 4 deletions plugins/in_splunk/splunk_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ static int process_hec_payload(struct flb_splunk *ctx, struct splunk_conn *conn,
type = HTTP_CONTENT_UNKNOWN;
}

if (request->data.len <= 0) {
if (request->data.len <= 0 && !mk_http_parser_is_content_chunked(&session->parser)) {
send_response(conn, 400, "error: no payload found\n");
return -2;
}
Expand Down Expand Up @@ -658,8 +658,8 @@ static int process_hec_raw_payload(struct flb_splunk *ctx, struct splunk_conn *c
flb_plg_debug(ctx->ins, "Mark as unknown type for ingested payloads");
}

if (request->data.len <= 0) {
send_response(conn, 400, "error: no payload found\n");
if (request->data.len <= 0 && !mk_http_parser_is_content_chunked(&session->parser)) {
send_response(conn, 400, "2 error: no payload found\n");
return -1;
}

Expand Down Expand Up @@ -709,6 +709,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
int len;
char *uri;
char *qs;
char *original_data = NULL;
size_t original_data_size;
char *out_chunked = NULL;
size_t out_chunked_size = 0;
off_t diff;
flb_sds_t tag;
struct mk_http_header *header;
Expand Down Expand Up @@ -830,6 +834,32 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
return -1;
}

/* If the request contains chunked transfer encoded data, decode it */\
if (mk_http_parser_is_content_chunked(&session->parser)) {
ret = mk_http_parser_chunked_decode(&session->parser,
conn->buf_data,
conn->buf_len,
&out_chunked,
&out_chunked_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "failed to decode chunked data");
send_response(conn, 400, "error: invalid chunked data\n");

flb_sds_destroy(tag);
mk_mem_free(uri);

return -1;
}

/* Update the request data */
original_data = request->data.data;
original_data_size = request->data.len;

/* assign the chunked one */
request->data.data = out_chunked;
request->data.len = out_chunked_size;
}

/* Handle every ingested payload cleanly */
flb_log_event_encoder_reset(&ctx->log_encoder);

Expand All @@ -846,12 +876,18 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
else if (strcasecmp(uri, "/services/collector/event/1.0") == 0 ||
strcasecmp(uri, "/services/collector/event") == 0 ||
strcasecmp(uri, "/services/collector") == 0) {
ret = process_hec_payload(ctx, conn, tag, session, request);

ret = process_hec_payload(ctx, conn, tag, session, request);
if (ret == -2) {
flb_sds_destroy(tag);
mk_mem_free(uri);

if (out_chunked) {
mk_mem_free(out_chunked);
}
request->data.data = original_data;
request->data.len = original_data_size;

return -1;
}

Expand All @@ -866,6 +902,12 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
flb_sds_destroy(tag);
mk_mem_free(uri);

if (out_chunked) {
mk_mem_free(out_chunked);
}
request->data.data = original_data;
request->data.len = original_data_size;

return -1;
}
}
Expand All @@ -875,13 +917,25 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
flb_sds_destroy(tag);
mk_mem_free(uri);

if (out_chunked) {
mk_mem_free(out_chunked);
}
request->data.data = original_data;
request->data.len = original_data_size;

send_response(conn, 400, "error: invalid HTTP method\n");
return -1;
}

flb_sds_destroy(tag);
mk_mem_free(uri);

if (out_chunked) {
mk_mem_free(out_chunked);
}
request->data.data = original_data;
request->data.len = original_data_size;

return ret;
}

Expand Down

0 comments on commit 9d1105d

Please sign in to comment.