diff --git a/plugins/in_http/http_prot.c b/plugins/in_http/http_prot.c index e5a3b6bee99..7c3a96f6c9b 100644 --- a/plugins/in_http/http_prot.c +++ b/plugins/in_http/http_prot.c @@ -276,6 +276,8 @@ int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) msgpack_unpacked_init(&result); while (msgpack_unpack_next(&result, buf, size, &off) == MSGPACK_UNPACK_SUCCESS) { + obj = &result.data; + if (result.data.type == MSGPACK_OBJECT_MAP) { tag_from_record = NULL; if (ctx->tag_key) { @@ -301,7 +303,6 @@ int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) flb_log_event_encoder_reset(&ctx->log_encoder); } else if (result.data.type == MSGPACK_OBJECT_ARRAY) { - obj = &result.data; for (i = 0; i < obj->via.array.size; i++) { record = obj->via.array.ptr[i]; @@ -382,6 +383,7 @@ static ssize_t parse_payload_json(struct flb_http *ctx, flb_sds_t tag, return -1; } else if (ret == -1) { + flb_plg_warn(ctx->ins, "error parsing JSON message, skipping"); return -1; } @@ -515,6 +517,10 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, { int ret = -1; int type = -1; + char *original_data; + size_t original_data_size; + char *out_chunked = NULL; + size_t out_chunked_size; struct mk_http_header *header; header = &session->parser.headers[MK_HEADER_CONTENT_TYPE]; @@ -523,8 +529,8 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, return -1; } - if ((header->val.len == 16 && strncasecmp(header->val.data, "application/json", 16) == 0) || - (header->val.len > 16 && (strncasecmp(header->val.data, "application/json ", 17) == 0) || + if (((header->val.len == 16 && strncasecmp(header->val.data, "application/json", 16) == 0)) || + ((header->val.len > 16 && (strncasecmp(header->val.data, "application/json ", 17) == 0)) || strncasecmp(header->val.data, "application/json;", 17) == 0)) { type = HTTP_CONTENT_JSON; } @@ -539,11 +545,33 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, return -1; } - if (request->data.len <= 0) { + if (request->data.len <= 0 && !mk_http_parser_is_content_chunked(&session->parser)) { send_response(conn, 400, "error: no payload found\n"); return -1; } + /* content: check if the data comes in chunks (transfer-encoding: chunked) */ + if (mk_http_parser_is_content_chunked(&session->parser)) { + ret = mk_http_parser_chunked_decode(&session->parser, + conn->buf_data, + conn->buf_len, + &out_chunked, + &out_chunked_size); + + if (ret == -1) { + send_response(conn, 400, "error: invalid chunked data\n"); + return -1; + } + + /* link the decoded data */ + original_data = request->data.data; + original_data_size = request->data.len; + + request->data.data = out_chunked; + request->data.len = out_chunked_size; + } + + if (type == HTTP_CONTENT_JSON) { ret = parse_payload_json(ctx, tag, request->data.data, request->data.len); } @@ -551,6 +579,12 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, ret = parse_payload_urlencoded(ctx, tag, request->data.data, request->data.len); } + if (out_chunked) { + mk_mem_free(out_chunked); + request->data.data = original_data; + request->data.len = original_data_size; + } + if (ret != 0) { send_response(conn, 400, "error: invalid payload\n"); return -1;