From 94e2a443a3c53c33d888750d5b872a242ce93977 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 4 Oct 2024 16:11:47 -0600 Subject: [PATCH 1/9] in_opentelemetry: add support for HTTP/1.1 chunked transfer encoding Signed-off-by: Eduardo Silva --- plugins/in_opentelemetry/opentelemetry_prot.c | 42 ++++++++++++++++--- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/plugins/in_opentelemetry/opentelemetry_prot.c b/plugins/in_opentelemetry/opentelemetry_prot.c index 791db4d3f6b..451ceeab3c9 100644 --- a/plugins/in_opentelemetry/opentelemetry_prot.c +++ b/plugins/in_opentelemetry/opentelemetry_prot.c @@ -1785,14 +1785,16 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c int len; char *uri; char *qs; + char *out_chunked = NULL; + size_t out_chunked_size = 0; off_t diff; + size_t tag_len; flb_sds_t tag; - struct mk_http_header *header; - char *original_data; + char *original_data = NULL; size_t original_data_size; - char *uncompressed_data; + char *uncompressed_data = NULL; size_t uncompressed_data_size; - size_t tag_len; + struct mk_http_header *header; if (request->uri.data[0] != '/') { send_response(conn, 400, "error: invalid request\n"); @@ -1902,6 +1904,32 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c request->data.len = uncompressed_data_size; } + /* check if the request comes with chunked transfer encoding */ + if (mk_http_parser_is_content_chunked(&session->parser)) { + out_chunked = NULL; + out_chunked_size = 0; + + /* decode the chunks */ + ret = mk_http_parser_chunked_decode(&session->parser, + request->data.data, + request->data.len, + &out_chunked, + &out_chunked_size); + if (ret == -1) { + flb_sds_destroy(tag); + mk_mem_free(uri); + send_response(conn, 400, "error: invalid chunked data\n"); + if (uncompressed_data != NULL) { + flb_free(uncompressed_data); + } + return -1; + } + else { + request->data.data = out_chunked; + request->data.len = out_chunked_size; + } + } + if (strcmp(uri, "/v1/metrics") == 0) { ret = process_payload_metrics(ctx, conn, tag, tag_len, session, request); } @@ -1919,6 +1947,10 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c request->data.data = original_data; request->data.len = original_data_size; + if (out_chunked != NULL) { + mk_mem_free(out_chunked); + } + mk_mem_free(uri); flb_sds_destroy(tag); @@ -2423,7 +2455,7 @@ int opentelemetry_prot_handle_ng(struct flb_http_request *request, int grpc_request; struct flb_opentelemetry *context; int result = -1; - flb_sds_t tag; + flb_sds_t tag = NULL; context = (struct flb_opentelemetry *) response->stream->user_data; From 6008e9c9a5c223547fcd548e6db296eb74aafdd7 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 4 Oct 2024 16:54:00 -0600 Subject: [PATCH 2/9] in_splunk: add support for HTTP/1.1 chunked transfer encoding Signed-off-by: Eduardo Silva --- plugins/in_splunk/splunk_conn.c | 7 +++- plugins/in_splunk/splunk_prot.c | 62 ++++++++++++++++++++++++++++++--- 2 files changed, 64 insertions(+), 5 deletions(-) diff --git a/plugins/in_splunk/splunk_conn.c b/plugins/in_splunk/splunk_conn.c index 13bf4f0e16c..8294b1112b3 100644 --- a/plugins/in_splunk/splunk_conn.c +++ b/plugins/in_splunk/splunk_conn.c @@ -29,6 +29,7 @@ static void splunk_conn_request_init(struct mk_http_session *session, static int splunk_conn_event(void *data) { + int ret; int status; size_t size; ssize_t available; @@ -95,7 +96,11 @@ static int splunk_conn_event(void *data) if (status == MK_HTTP_PARSER_OK) { /* Do more logic parsing and checks for this request */ - splunk_prot_handle(ctx, conn, &conn->session, &conn->request); + ret = splunk_prot_handle(ctx, conn, &conn->session, &conn->request); + if (ret == -1) { + splunk_conn_del(conn); + return -1; + } /* Evict the processed request from the connection buffer and reinitialize * the HTTP parser. diff --git a/plugins/in_splunk/splunk_prot.c b/plugins/in_splunk/splunk_prot.c index cf614b06679..b550452ff73 100644 --- a/plugins/in_splunk/splunk_prot.c +++ b/plugins/in_splunk/splunk_prot.c @@ -592,7 +592,7 @@ static int process_hec_payload(struct flb_splunk *ctx, struct splunk_conn *conn, type = HTTP_CONTENT_UNKNOWN; } - if (request->data.len <= 0) { + if (request->data.len <= 0 && !mk_http_parser_is_content_chunked(&session->parser)) { send_response(conn, 400, "error: no payload found\n"); return -2; } @@ -658,8 +658,8 @@ static int process_hec_raw_payload(struct flb_splunk *ctx, struct splunk_conn *c flb_plg_debug(ctx->ins, "Mark as unknown type for ingested payloads"); } - if (request->data.len <= 0) { - send_response(conn, 400, "error: no payload found\n"); + if (request->data.len <= 0 && !mk_http_parser_is_content_chunked(&session->parser)) { + send_response(conn, 400, "2 error: no payload found\n"); return -1; } @@ -709,6 +709,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, int len; char *uri; char *qs; + char *original_data = NULL; + size_t original_data_size; + char *out_chunked = NULL; + size_t out_chunked_size = 0; off_t diff; flb_sds_t tag; struct mk_http_header *header; @@ -830,6 +834,32 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, return -1; } + /* If the request contains chunked transfer encoded data, decode it */\ + if (mk_http_parser_is_content_chunked(&session->parser)) { + ret = mk_http_parser_chunked_decode(&session->parser, + conn->buf_data, + conn->buf_len, + &out_chunked, + &out_chunked_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "failed to decode chunked data"); + send_response(conn, 400, "error: invalid chunked data\n"); + + flb_sds_destroy(tag); + mk_mem_free(uri); + + return -1; + } + + /* Update the request data */ + original_data = request->data.data; + original_data_size = request->data.len; + + /* assign the chunked one */ + request->data.data = out_chunked; + request->data.len = out_chunked_size; + } + /* Handle every ingested payload cleanly */ flb_log_event_encoder_reset(&ctx->log_encoder); @@ -846,12 +876,18 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, else if (strcasecmp(uri, "/services/collector/event/1.0") == 0 || strcasecmp(uri, "/services/collector/event") == 0 || strcasecmp(uri, "/services/collector") == 0) { - ret = process_hec_payload(ctx, conn, tag, session, request); + ret = process_hec_payload(ctx, conn, tag, session, request); if (ret == -2) { flb_sds_destroy(tag); mk_mem_free(uri); + if (out_chunked) { + mk_mem_free(out_chunked); + } + request->data.data = original_data; + request->data.len = original_data_size; + return -1; } @@ -866,6 +902,12 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, flb_sds_destroy(tag); mk_mem_free(uri); + if (out_chunked) { + mk_mem_free(out_chunked); + } + request->data.data = original_data; + request->data.len = original_data_size; + return -1; } } @@ -875,6 +917,12 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, flb_sds_destroy(tag); mk_mem_free(uri); + if (out_chunked) { + mk_mem_free(out_chunked); + } + request->data.data = original_data; + request->data.len = original_data_size; + send_response(conn, 400, "error: invalid HTTP method\n"); return -1; } @@ -882,6 +930,12 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, flb_sds_destroy(tag); mk_mem_free(uri); + if (out_chunked) { + mk_mem_free(out_chunked); + } + request->data.data = original_data; + request->data.len = original_data_size; + return ret; } From 9c347bd2761d796c93e8f6fe4c184e637e2241b3 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sat, 5 Oct 2024 10:23:30 -0600 Subject: [PATCH 3/9] in_http: add support for HTTP/1.1 chunked transfer encoding Signed-off-by: Eduardo Silva --- plugins/in_http/http_prot.c | 42 +++++++++++++++++++++++++++++++++---- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/plugins/in_http/http_prot.c b/plugins/in_http/http_prot.c index e5a3b6bee99..7c3a96f6c9b 100644 --- a/plugins/in_http/http_prot.c +++ b/plugins/in_http/http_prot.c @@ -276,6 +276,8 @@ int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) msgpack_unpacked_init(&result); while (msgpack_unpack_next(&result, buf, size, &off) == MSGPACK_UNPACK_SUCCESS) { + obj = &result.data; + if (result.data.type == MSGPACK_OBJECT_MAP) { tag_from_record = NULL; if (ctx->tag_key) { @@ -301,7 +303,6 @@ int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) flb_log_event_encoder_reset(&ctx->log_encoder); } else if (result.data.type == MSGPACK_OBJECT_ARRAY) { - obj = &result.data; for (i = 0; i < obj->via.array.size; i++) { record = obj->via.array.ptr[i]; @@ -382,6 +383,7 @@ static ssize_t parse_payload_json(struct flb_http *ctx, flb_sds_t tag, return -1; } else if (ret == -1) { + flb_plg_warn(ctx->ins, "error parsing JSON message, skipping"); return -1; } @@ -515,6 +517,10 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, { int ret = -1; int type = -1; + char *original_data; + size_t original_data_size; + char *out_chunked = NULL; + size_t out_chunked_size; struct mk_http_header *header; header = &session->parser.headers[MK_HEADER_CONTENT_TYPE]; @@ -523,8 +529,8 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, return -1; } - if ((header->val.len == 16 && strncasecmp(header->val.data, "application/json", 16) == 0) || - (header->val.len > 16 && (strncasecmp(header->val.data, "application/json ", 17) == 0) || + if (((header->val.len == 16 && strncasecmp(header->val.data, "application/json", 16) == 0)) || + ((header->val.len > 16 && (strncasecmp(header->val.data, "application/json ", 17) == 0)) || strncasecmp(header->val.data, "application/json;", 17) == 0)) { type = HTTP_CONTENT_JSON; } @@ -539,11 +545,33 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, return -1; } - if (request->data.len <= 0) { + if (request->data.len <= 0 && !mk_http_parser_is_content_chunked(&session->parser)) { send_response(conn, 400, "error: no payload found\n"); return -1; } + /* content: check if the data comes in chunks (transfer-encoding: chunked) */ + if (mk_http_parser_is_content_chunked(&session->parser)) { + ret = mk_http_parser_chunked_decode(&session->parser, + conn->buf_data, + conn->buf_len, + &out_chunked, + &out_chunked_size); + + if (ret == -1) { + send_response(conn, 400, "error: invalid chunked data\n"); + return -1; + } + + /* link the decoded data */ + original_data = request->data.data; + original_data_size = request->data.len; + + request->data.data = out_chunked; + request->data.len = out_chunked_size; + } + + if (type == HTTP_CONTENT_JSON) { ret = parse_payload_json(ctx, tag, request->data.data, request->data.len); } @@ -551,6 +579,12 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, ret = parse_payload_urlencoded(ctx, tag, request->data.data, request->data.len); } + if (out_chunked) { + mk_mem_free(out_chunked); + request->data.data = original_data; + request->data.len = original_data_size; + } + if (ret != 0) { send_response(conn, 400, "error: invalid payload\n"); return -1; From c0e0f09d8c10d3cfed579c07ec5bfccd07c7c51d Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 9 Oct 2024 12:31:41 -0600 Subject: [PATCH 4/9] http_server: http1: add chunked transfer encoding support Signed-off-by: Eduardo Silva --- src/http_server/flb_http_server_http1.c | 134 +++++++++++++++--------- 1 file changed, 87 insertions(+), 47 deletions(-) diff --git a/src/http_server/flb_http_server_http1.c b/src/http_server/flb_http_server_http1.c index 6a61d2a3e93..e5fcba9114d 100644 --- a/src/http_server/flb_http_server_http1.c +++ b/src/http_server/flb_http_server_http1.c @@ -21,7 +21,7 @@ /* PRIVATE */ -static void dummy_mk_http_session_init(struct mk_http_session *session, +static void dummy_mk_http_session_init(struct mk_http_session *session, struct mk_server *server) { session->_sched_init = MK_TRUE; @@ -94,7 +94,7 @@ static int http1_evict_request(struct flb_http1_server_session *session) request_end += content_length; } else { - request_end = (uintptr_t) strstr(session_buffer, + request_end = (uintptr_t) strstr(session_buffer, "\r\n\r\n"); if(request_end != 0) { @@ -119,7 +119,7 @@ static int http1_evict_request(struct flb_http1_server_session *session) else { session_buffer_length -= request_length; - memmove(session_buffer, + memmove(session_buffer, &session_buffer[request_length], session_buffer_length); @@ -137,6 +137,8 @@ static int http1_session_process_request(struct flb_http1_server_session *sessio struct mk_list *iterator; struct mk_http_header *header; int result; + size_t chunked_size; + size_t written_bytes = 0; result = flb_http_request_init(&session->stream.request); @@ -148,16 +150,16 @@ static int http1_session_process_request(struct flb_http1_server_session *sessio if (session->inner_request.uri_processed.data != NULL) { session->stream.request.path = \ - cfl_sds_create_len(session->inner_request.uri_processed.data, + cfl_sds_create_len(session->inner_request.uri_processed.data, session->inner_request.uri_processed.len); } else { session->stream.request.path = \ - cfl_sds_create_len(session->inner_request.uri.data, + cfl_sds_create_len(session->inner_request.uri.data, session->inner_request.uri.len); } - if (session->stream.request.path == NULL) { + if (session->stream.request.path == NULL) { return -1; } @@ -201,7 +203,7 @@ static int http1_session_process_request(struct flb_http1_server_session *sessio session->stream.request.content_length = session->inner_request.content_length; - mk_list_foreach(iterator, + mk_list_foreach(iterator, &session->inner_parser.header_list) { header = mk_list_entry(iterator, struct mk_http_header, _head); @@ -209,34 +211,34 @@ static int http1_session_process_request(struct flb_http1_server_session *sessio header->val.data != NULL && header->val.len > 0) { if (flb_http_server_strncasecmp( - (const uint8_t *) header->key.data, - header->key.len, + (const uint8_t *) header->key.data, + header->key.len, "host", 0) == 0) { session->stream.request.host = \ - cfl_sds_create_len((const char *) header->val.data, + cfl_sds_create_len((const char *) header->val.data, header->val.len); - + if (session->stream.request.host == NULL) { return -1; } } else if (flb_http_server_strncasecmp( - (const uint8_t *) header->key.data, - header->key.len, + (const uint8_t *) header->key.data, + header->key.len, "content-type", 0) == 0) { session->stream.request.content_type = \ - cfl_sds_create_len((const char *) header->val.data, + cfl_sds_create_len((const char *) header->val.data, header->val.len); - + if (session->stream.request.content_type == NULL) { return -1; } } - result = flb_http_request_set_header(&session->stream.request, - header->key.data, - header->key.len, - (void *) header->val.data, + result = flb_http_request_set_header(&session->stream.request, + header->key.data, + header->key.len, + (void *) header->val.data, header->val.len); if (result != 0) { @@ -253,9 +255,41 @@ static int http1_session_process_request(struct flb_http1_server_session *sessio } } - if (session->inner_request.data.data != NULL) { + /* If the content comes in chunks (transfer-encoding: chunked) */ + if (mk_http_parser_is_content_chunked(&session->inner_parser)) { + /* Get the total size of all the chunks */ + chunked_size = mk_http_parser_content_length(&session->inner_parser); + if (chunked_size == 0) { + session->stream.status = HTTP_STREAM_STATUS_ERROR; + return -1; + } + + /* allocate a buffer to get a copy of the decoded chunks */ + session->stream.request.body = cfl_sds_create_size(chunked_size); + if (!session->stream.request.body) { + session->stream.status = HTTP_STREAM_STATUS_ERROR; + return -1; + } + + /* decode the data into the new buffer */ + result = mk_http_parser_chunked_decode_buf(&session->inner_parser, + session->parent->incoming_data, + cfl_sds_len(session->parent->incoming_data), + session->stream.request.body, + chunked_size, + &written_bytes); + if (result == -1) { + session->stream.status = HTTP_STREAM_STATUS_ERROR; + cfl_sds_destroy(session->stream.request.body); + session->stream.request.body = NULL; + return -1; + } + + cfl_sds_len_set(session->stream.request.body, written_bytes); + } + else if (session->inner_request.data.data != NULL) { session->stream.request.body = \ - cfl_sds_create_len(session->inner_request.data.data, + cfl_sds_create_len(session->inner_request.data.data, session->inner_request.data.len); if (session->stream.request.body == NULL) { @@ -271,7 +305,7 @@ static int http1_session_process_request(struct flb_http1_server_session *sessio cfl_list_del(&session->stream.request._head); } - cfl_list_add(&session->stream.request._head, + cfl_list_add(&session->stream.request._head, &session->parent->request_queue); return 0; @@ -280,7 +314,7 @@ static int http1_session_process_request(struct flb_http1_server_session *sessio /* RESPONSE */ struct flb_http_response *flb_http1_response_begin( - struct flb_http1_server_session *session, + struct flb_http1_server_session *session, struct flb_http_stream *stream) { int result; @@ -344,8 +378,8 @@ int flb_http1_response_commit(struct flb_http_response *response) } mk_list_foreach(header_iterator, &response->headers->entries) { - header_entry = mk_list_entry(header_iterator, - struct flb_hash_table_entry, + header_entry = mk_list_entry(header_iterator, + struct flb_hash_table_entry, _head_parent); if (header_entry == NULL) { @@ -354,11 +388,11 @@ int flb_http1_response_commit(struct flb_http_response *response) return -5; } - sds_result = cfl_sds_printf(&response_buffer, - "%.*s: %.*s\r\n", - (int) header_entry->key_len, - (const char *) header_entry->key, - (int) header_entry->val_size, + sds_result = cfl_sds_printf(&response_buffer, + "%.*s: %.*s\r\n", + (int) header_entry->key_len, + (const char *) header_entry->key, + (int) header_entry->val_size, (const char *) header_entry->val); if (sds_result == NULL) { @@ -377,7 +411,7 @@ int flb_http1_response_commit(struct flb_http_response *response) } if (response->body != NULL) { - sds_result = cfl_sds_cat(response_buffer, + sds_result = cfl_sds_cat(response_buffer, response->body, cfl_sds_len(response->body)); @@ -386,12 +420,12 @@ int flb_http1_response_commit(struct flb_http_response *response) return -8; } - + response_buffer = sds_result; } - sds_result = cfl_sds_cat(session->parent->outgoing_data, - response_buffer, + sds_result = cfl_sds_cat(session->parent->outgoing_data, + response_buffer, cfl_sds_len(response_buffer)); cfl_sds_destroy(response_buffer); @@ -406,13 +440,13 @@ int flb_http1_response_commit(struct flb_http_response *response) } -int flb_http1_response_set_header(struct flb_http_response *response, +int flb_http1_response_set_header(struct flb_http_response *response, char *name, size_t name_length, char *value, size_t value_length) { int result; - result = flb_hash_table_add(response->headers, + result = flb_hash_table_add(response->headers, (const char *) name, (int) name_length, (void *) value, (ssize_t) value_length); @@ -423,13 +457,13 @@ int flb_http1_response_set_header(struct flb_http_response *response, return 0; } -int flb_http1_response_set_status(struct flb_http_response *response, +int flb_http1_response_set_status(struct flb_http_response *response, int status) { return 0; } -int flb_http1_response_set_body(struct flb_http_response *response, +int flb_http1_response_set_body(struct flb_http_response *response, unsigned char *body, size_t body_length) { return 0; @@ -437,7 +471,7 @@ int flb_http1_response_set_body(struct flb_http_response *response, /* SESSION */ -int flb_http1_server_session_init(struct flb_http1_server_session *session, +int flb_http1_server_session_init(struct flb_http1_server_session *session, struct flb_http_server_session *parent) { void *user_data; @@ -486,15 +520,15 @@ void flb_http1_server_session_destroy(struct flb_http1_server_session *session) } int flb_http1_server_session_ingest(struct flb_http1_server_session *session, - unsigned char *buffer, - size_t length) + unsigned char *buffer, + size_t length) { int result; - result = mk_http_parser(&session->inner_request, - &session->inner_parser, - session->parent->incoming_data, - cfl_sds_len(session->parent->incoming_data), + result = mk_http_parser(&session->inner_request, + &session->inner_parser, + session->parent->incoming_data, + cfl_sds_len(session->parent->incoming_data), &session->inner_server); if (result == MK_HTTP_PARSER_OK) { @@ -508,10 +542,16 @@ int flb_http1_server_session_ingest(struct flb_http1_server_session *session, http1_evict_request(session); } + else if (result == MK_HTTP_PARSER_PENDING) { + /* + * If the parser is still in a pending state, we just return a success + * status and wait for more data + */ + return HTTP_SERVER_SUCCESS; + } dummy_mk_http_request_init(&session->inner_session, &session->inner_request); - mk_http_parser_init(&session->inner_parser); - return HTTP_SERVER_SUCCESS; + return HTTP_SERVER_SUCCESS; } From 162e1b9010eb1dc63fbc46fa900bc0c742024137 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 9 Oct 2024 16:09:45 -0600 Subject: [PATCH 5/9] in_splunk: adjust HTTP/1.1 buffer based on the request size Signed-off-by: Eduardo Silva --- plugins/in_splunk/splunk_conn.c | 64 ++++++++++++++++----------------- 1 file changed, 30 insertions(+), 34 deletions(-) diff --git a/plugins/in_splunk/splunk_conn.c b/plugins/in_splunk/splunk_conn.c index 8294b1112b3..4c25f8d8e5a 100644 --- a/plugins/in_splunk/splunk_conn.c +++ b/plugins/in_splunk/splunk_conn.c @@ -35,7 +35,6 @@ static int splunk_conn_event(void *data) ssize_t available; ssize_t bytes; char *tmp; - char *request_end; size_t request_len; struct flb_connection *connection; struct splunk_conn *conn; @@ -102,47 +101,44 @@ static int splunk_conn_event(void *data) return -1; } - /* Evict the processed request from the connection buffer and reinitialize + /* + * Evict the processed request from the connection buffer and reinitialize * the HTTP parser. */ - request_end = NULL; + /* Use the last parser position as the request length */ + request_len = mk_http_parser_request_size(&conn->session.parser, + conn->buf_data, + conn->buf_len); - if (NULL != conn->request.data.data) { - request_end = &conn->request.data.data[conn->request.data.len]; + if (request_len == -1 || (request_len > conn->buf_len)) { + /* Unexpected but let's make sure things are safe */ + conn->buf_len = 0; + flb_plg_debug(ctx->ins, "request length exceeds buffer length, closing connection"); + splunk_conn_del(conn); + return -1; } - else { - request_end = strstr(conn->buf_data, "\r\n\r\n"); - if(NULL != request_end) { - request_end = &request_end[4]; - } - } + /* If we have extra bytes in our bytes, adjust the extra bytes */ + if (0 < (conn->buf_len - request_len)) { + memmove(conn->buf_data, &conn->buf_data[request_len], + conn->buf_len - request_len); - if (NULL != request_end) { - request_len = (size_t)(request_end - conn->buf_data); - - if (0 < (conn->buf_len - request_len)) { - memmove(conn->buf_data, &conn->buf_data[request_len], - conn->buf_len - request_len); - - conn->buf_data[conn->buf_len - request_len] = '\0'; - conn->buf_len -= request_len; - } - else { - memset(conn->buf_data, 0, request_len); - - conn->buf_len = 0; - } - - /* Reinitialize the parser so the next request is properly - * handled, the additional memset intends to wipe any left over data - * from the headers parsed in the previous request. - */ - memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); - mk_http_parser_init(&conn->session.parser); - splunk_conn_request_init(&conn->session, &conn->request); + conn->buf_data[conn->buf_len - request_len] = '\0'; + conn->buf_len -= request_len; + } + else { + memset(conn->buf_data, 0, request_len); + conn->buf_len = 0; } + + /* Reinitialize the parser so the next request is properly + * handled, the additional memset intends to wipe any left over data + * from the headers parsed in the previous request. + */ + memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); + mk_http_parser_init(&conn->session.parser); + splunk_conn_request_init(&conn->session, &conn->request); } else if (status == MK_HTTP_PARSER_ERROR) { splunk_prot_handle_error(ctx, conn, &conn->session, &conn->request); From ea81b6226b2cced67e16b56bb71bbfa92bd6466f Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 9 Oct 2024 16:09:59 -0600 Subject: [PATCH 6/9] in_opentelemetry: adjust HTTP/1.1 buffer based on the request size Signed-off-by: Eduardo Silva --- plugins/in_opentelemetry/http_conn.c | 64 +++++++++---------- plugins/in_opentelemetry/opentelemetry_prot.c | 28 ++++---- 2 files changed, 44 insertions(+), 48 deletions(-) diff --git a/plugins/in_opentelemetry/http_conn.c b/plugins/in_opentelemetry/http_conn.c index 89410244bba..9d2865c2f71 100644 --- a/plugins/in_opentelemetry/http_conn.c +++ b/plugins/in_opentelemetry/http_conn.c @@ -35,7 +35,6 @@ static int opentelemetry_conn_event(void *data) ssize_t available; ssize_t bytes; char *tmp; - char *request_end; size_t request_len; struct http_conn *conn; struct mk_event *event; @@ -98,47 +97,44 @@ static int opentelemetry_conn_event(void *data) /* Do more logic parsing and checks for this request */ opentelemetry_prot_handle(ctx, conn, &conn->session, &conn->request); - /* Evict the processed request from the connection buffer and reinitialize + /* + * Evict the processed request from the connection buffer and reinitialize * the HTTP parser. */ - request_end = NULL; + /* Use the last parser position as the request length */ + request_len = mk_http_parser_request_size(&conn->session.parser, + conn->buf_data, + conn->buf_len); - if (NULL != conn->request.data.data) { - request_end = &conn->request.data.data[conn->request.data.len]; + if (request_len == -1 || (request_len > conn->buf_len)) { + /* Unexpected but let's make sure things are safe */ + conn->buf_len = 0; + flb_plg_debug(ctx->ins, "request length exceeds buffer length, closing connection"); + opentelemetry_conn_del(conn); + return -1; } - else { - request_end = strstr(conn->buf_data, "\r\n\r\n"); - if(NULL != request_end) { - request_end = &request_end[4]; - } - } + /* If we have extra bytes in our bytes, adjust the extra bytes */ + if (0 < (conn->buf_len - request_len)) { + memmove(conn->buf_data, &conn->buf_data[request_len], + conn->buf_len - request_len); - if (NULL != request_end) { - request_len = (size_t)(request_end - conn->buf_data); - - if (0 < (conn->buf_len - request_len)) { - memmove(conn->buf_data, &conn->buf_data[request_len], - conn->buf_len - request_len); - - conn->buf_data[conn->buf_len - request_len] = '\0'; - conn->buf_len -= request_len; - } - else { - memset(conn->buf_data, 0, request_len); - - conn->buf_len = 0; - } - - /* Reinitialize the parser so the next request is properly - * handled, the additional memset intends to wipe any left over data - * from the headers parsed in the previous request. - */ - memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); - mk_http_parser_init(&conn->session.parser); - opentelemetry_conn_request_init(&conn->session, &conn->request); + conn->buf_data[conn->buf_len - request_len] = '\0'; + conn->buf_len -= request_len; + } + else { + memset(conn->buf_data, 0, request_len); + conn->buf_len = 0; } + + /* Reinitialize the parser so the next request is properly + * handled, the additional memset intends to wipe any left over data + * from the headers parsed in the previous request. + */ + memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); + mk_http_parser_init(&conn->session.parser); + opentelemetry_conn_request_init(&conn->session, &conn->request); } else if (status == MK_HTTP_PARSER_ERROR) { opentelemetry_prot_handle_error(ctx, conn, &conn->session, &conn->request); diff --git a/plugins/in_opentelemetry/opentelemetry_prot.c b/plugins/in_opentelemetry/opentelemetry_prot.c index 451ceeab3c9..3a2180387e7 100644 --- a/plugins/in_opentelemetry/opentelemetry_prot.c +++ b/plugins/in_opentelemetry/opentelemetry_prot.c @@ -1895,15 +1895,6 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c original_data = request->data.data; original_data_size = request->data.len; - ret = opentelemetry_prot_uncompress(session, request, - &uncompressed_data, - &uncompressed_data_size); - - if (ret > 0) { - request->data.data = uncompressed_data; - request->data.len = uncompressed_data_size; - } - /* check if the request comes with chunked transfer encoding */ if (mk_http_parser_is_content_chunked(&session->parser)) { out_chunked = NULL; @@ -1911,8 +1902,8 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c /* decode the chunks */ ret = mk_http_parser_chunked_decode(&session->parser, - request->data.data, - request->data.len, + conn->buf_data, + conn->buf_len, &out_chunked, &out_chunked_size); if (ret == -1) { @@ -1930,6 +1921,15 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c } } + ret = opentelemetry_prot_uncompress(session, request, + &uncompressed_data, + &uncompressed_data_size); + + if (ret > 0) { + request->data.data = uncompressed_data; + request->data.len = uncompressed_data_size; + } + if (strcmp(uri, "/v1/metrics") == 0) { ret = process_payload_metrics(ctx, conn, tag, tag_len, session, request); } @@ -1940,13 +1940,13 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c ret = process_payload_logs(ctx, conn, tag, tag_len, session, request); } + request->data.data = original_data; + request->data.len = original_data_size; + if (uncompressed_data != NULL) { flb_free(uncompressed_data); } - request->data.data = original_data; - request->data.len = original_data_size; - if (out_chunked != NULL) { mk_mem_free(out_chunked); } From 4535ba51615a16441cf456deb01cb5f2357b3725 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 9 Oct 2024 16:10:09 -0600 Subject: [PATCH 7/9] in_http: adjust HTTP/1.1 buffer based on the request size Signed-off-by: Eduardo Silva --- plugins/in_http/http_conn.c | 64 +++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 34 deletions(-) diff --git a/plugins/in_http/http_conn.c b/plugins/in_http/http_conn.c index 9513605402f..7b59f5c63ad 100644 --- a/plugins/in_http/http_conn.c +++ b/plugins/in_http/http_conn.c @@ -93,7 +93,6 @@ static int http_conn_event(void *data) size_t size; ssize_t available; ssize_t bytes; - char *request_end; size_t request_len; struct flb_connection *connection; struct http_conn *conn; @@ -153,47 +152,44 @@ static int http_conn_event(void *data) /* Do more logic parsing and checks for this request */ http_prot_handle(ctx, conn, &conn->session, &conn->request); - /* Evict the processed request from the connection buffer and reinitialize + /* + * Evict the processed request from the connection buffer and reinitialize * the HTTP parser. */ - request_end = NULL; + /* Use the last parser position as the request length */ + request_len = mk_http_parser_request_size(&conn->session.parser, + conn->buf_data, + conn->buf_len); - if (NULL != conn->request.data.data) { - request_end = &conn->request.data.data[conn->request.data.len]; + if (request_len == -1 || (request_len > conn->buf_len)) { + /* Unexpected but let's make sure things are safe */ + conn->buf_len = 0; + flb_plg_debug(ctx->ins, "request length exceeds buffer length, closing connection"); + http_conn_del(conn); + return -1; } - else { - request_end = strstr(conn->buf_data, "\r\n\r\n"); - if(NULL != request_end) { - request_end = &request_end[4]; - } - } + /* If we have extra bytes in our bytes, adjust the extra bytes */ + if (0 < (conn->buf_len - request_len)) { + memmove(conn->buf_data, &conn->buf_data[request_len], + conn->buf_len - request_len); - if (NULL != request_end) { - request_len = (size_t)(request_end - conn->buf_data); - - if (0 < (conn->buf_len - request_len)) { - memmove(conn->buf_data, &conn->buf_data[request_len], - conn->buf_len - request_len); - - conn->buf_data[conn->buf_len - request_len] = '\0'; - conn->buf_len -= request_len; - } - else { - memset(conn->buf_data, 0, request_len); - - conn->buf_len = 0; - } - - /* Reinitialize the parser so the next request is properly - * handled, the additional memset intends to wipe any left over data - * from the headers parsed in the previous request. - */ - memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); - mk_http_parser_init(&conn->session.parser); - http_conn_request_init(&conn->session, &conn->request); + conn->buf_data[conn->buf_len - request_len] = '\0'; + conn->buf_len -= request_len; + } + else { + memset(conn->buf_data, 0, request_len); + conn->buf_len = 0; } + + /* Reinitialize the parser so the next request is properly + * handled, the additional memset intends to wipe any left over data + * from the headers parsed in the previous request. + */ + memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); + mk_http_parser_init(&conn->session.parser); + http_conn_request_init(&conn->session, &conn->request); } else if (status == MK_HTTP_PARSER_ERROR) { http_prot_handle_error(ctx, conn, &conn->session, &conn->request); From 206404f97d4cd57853c780da5382c238fccf22ab Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 9 Oct 2024 16:10:51 -0600 Subject: [PATCH 8/9] in_elasticsearch: add support for chunked transfer encoding Signed-off-by: Eduardo Silva --- .../in_elasticsearch_bulk_conn.c | 65 +++++++++---------- .../in_elasticsearch_bulk_prot.c | 43 ++++++++++-- 2 files changed, 70 insertions(+), 38 deletions(-) diff --git a/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c b/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c index fe6e46fb657..980c227ff93 100644 --- a/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c +++ b/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c @@ -34,7 +34,6 @@ static int in_elasticsearch_bulk_conn_event(void *data) ssize_t available; ssize_t bytes; char *tmp; - char *request_end; size_t request_len; struct flb_connection *connection; struct in_elasticsearch_bulk_conn *conn; @@ -98,47 +97,45 @@ static int in_elasticsearch_bulk_conn_event(void *data) /* Do more logic parsing and checks for this request */ in_elasticsearch_bulk_prot_handle(ctx, conn, &conn->session, &conn->request); - /* Evict the processed request from the connection buffer and reinitialize + /* + * Evict the processed request from the connection buffer and reinitialize * the HTTP parser. */ - request_end = NULL; + /* Use the last parser position as the request length */ + request_len = mk_http_parser_request_size(&conn->session.parser, + conn->buf_data, + conn->buf_len); - if (NULL != conn->request.data.data) { - request_end = &conn->request.data.data[conn->request.data.len]; + if (request_len == -1 || (request_len > conn->buf_len)) { + /* Unexpected but let's make sure things are safe */ + conn->buf_len = 0; + flb_plg_debug(ctx->ins, "request length exceeds buffer length, closing connection"); + in_elasticsearch_bulk_conn_del(conn); + return -1; } - else { - request_end = strstr(conn->buf_data, "\r\n\r\n"); - if(NULL != request_end) { - request_end = &request_end[4]; - } - } + /* If we have extra bytes in our bytes, adjust the extra bytes */ + if (0 < (conn->buf_len - request_len)) { + memmove(conn->buf_data, &conn->buf_data[request_len], + conn->buf_len - request_len); - if (NULL != request_end) { - request_len = (size_t)(request_end - conn->buf_data); - - if (0 < (conn->buf_len - request_len)) { - memmove(conn->buf_data, &conn->buf_data[request_len], - conn->buf_len - request_len); - - conn->buf_data[conn->buf_len - request_len] = '\0'; - conn->buf_len -= request_len; - } - else { - memset(conn->buf_data, 0, request_len); - - conn->buf_len = 0; - } - - /* Reinitialize the parser so the next request is properly - * handled, the additional memset intends to wipe any left over data - * from the headers parsed in the previous request. - */ - memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); - mk_http_parser_init(&conn->session.parser); - in_elasticsearch_bulk_conn_request_init(&conn->session, &conn->request); + conn->buf_data[conn->buf_len - request_len] = '\0'; + conn->buf_len -= request_len; } + else { + memset(conn->buf_data, 0, request_len); + conn->buf_len = 0; + } + + /* + * Reinitialize the parser so the next request is properly + * handled, the additional memset intends to wipe any left over data + * from the headers parsed in the previous request. + */ + memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); + mk_http_parser_init(&conn->session.parser); + in_elasticsearch_bulk_conn_request_init(&conn->session, &conn->request); } else if (status == MK_HTTP_PARSER_ERROR) { in_elasticsearch_bulk_prot_handle_error(ctx, conn, &conn->session, &conn->request); diff --git a/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c b/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c index c705af60d81..a2424413e19 100644 --- a/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c +++ b/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c @@ -621,6 +621,10 @@ static int process_payload(struct flb_in_elasticsearch *ctx, struct in_elasticse int gzip_compressed = FLB_FALSE; void *gz_data = NULL; size_t gz_size = -1; + char *out_chunked = NULL; + size_t out_chunked_size = 0; + char *payload_buf; + size_t payload_size; header = &session->parser.headers[MK_HEADER_CONTENT_TYPE]; if (header->key.data == NULL) { @@ -643,7 +647,7 @@ static int process_payload(struct flb_in_elasticsearch *ctx, struct in_elasticse return -1; } - if (request->data.len <= 0) { + if (request->data.len <= 0 && !mk_http_parser_is_content_chunked(&session->parser)) { send_response(conn, 400, "error: no payload found\n"); return -1; } @@ -664,8 +668,32 @@ static int process_payload(struct flb_in_elasticsearch *ctx, struct in_elasticse } if (type == HTTP_CONTENT_NDJSON || type == HTTP_CONTENT_JSON) { + /* Check if the data is chunked */ + payload_buf = NULL; + payload_size = 0; + + if (mk_http_parser_is_content_chunked(&session->parser)) { + ret = mk_http_parser_chunked_decode(&session->parser, + conn->buf_data, + conn->buf_len, + &out_chunked, + &out_chunked_size); + + if (ret == -1) { + send_response(conn, 400, "error: invalid chunked data\n"); + return -1; + } + + payload_buf = out_chunked; + payload_size = out_chunked_size; + } + else { + payload_buf = request->data.data; + payload_size = request->data.len; + } + if (gzip_compressed == FLB_TRUE) { - ret = flb_gzip_uncompress((void *) request->data.data, request->data.len, + ret = flb_gzip_uncompress((void *) payload_buf, payload_size, &gz_data, &gz_size); if (ret == -1) { flb_error("[elasticsearch_bulk_prot] gzip uncompress is failed"); @@ -675,10 +703,15 @@ static int process_payload(struct flb_in_elasticsearch *ctx, struct in_elasticse flb_free(gz_data); } else { - parse_payload_ndjson(ctx, tag, request->data.data, request->data.len, bulk_statuses); + parse_payload_ndjson(ctx, tag, payload_buf, payload_size, bulk_statuses); } } + /* release chunked data if has been set */ + if (out_chunked) { + mk_mem_free(out_chunked); + } + return 0; } @@ -856,7 +889,8 @@ int in_elasticsearch_bulk_prot_handle(struct flb_in_elasticsearch *ctx, mk_mem_free(uri); return -1; } - } else { + } + else { flb_sds_destroy(tag); mk_mem_free(uri); @@ -1056,6 +1090,7 @@ static int process_payload_ng(struct flb_http_request *request, return -1; } + printf("Processing payload 2 : %s\n", request->body); parse_payload_ndjson(context, tag, request->body, cfl_sds_len(request->body), bulk_statuses); return 0; From eff804da93d592666c8613df20827fbd5771fcb7 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 9 Oct 2024 16:20:46 -0600 Subject: [PATCH 9/9] lib: monkey: upgrade to v1.8.1 Signed-off-by: Eduardo Silva --- lib/monkey/CMakeLists.txt | 4 +- lib/monkey/api/test.c | 6 +- lib/monkey/include/monkey/mk_http_parser.h | 85 ++++- lib/monkey/mk_core/mk_event_select.c | 4 +- lib/monkey/mk_server/mk_http.c | 3 + lib/monkey/mk_server/mk_http_parser.c | 364 ++++++++++++++++++++- lib/monkey/plugins/logger/logger.c | 4 +- 7 files changed, 451 insertions(+), 19 deletions(-) diff --git a/lib/monkey/CMakeLists.txt b/lib/monkey/CMakeLists.txt index 95d8cc1e618..028240bcc44 100644 --- a/lib/monkey/CMakeLists.txt +++ b/lib/monkey/CMakeLists.txt @@ -22,8 +22,8 @@ endif() # Monkey Version set(MK_VERSION_MAJOR 1) -set(MK_VERSION_MINOR 7) -set(MK_VERSION_PATCH 2) +set(MK_VERSION_MINOR 8) +set(MK_VERSION_PATCH 1) set(MK_VERSION_STR "${MK_VERSION_MAJOR}.${MK_VERSION_MINOR}.${MK_VERSION_PATCH}") # Output paths diff --git a/lib/monkey/api/test.c b/lib/monkey/api/test.c index ffb96f1352f..6ed0fe0e0a2 100644 --- a/lib/monkey/api/test.c +++ b/lib/monkey/api/test.c @@ -8,7 +8,7 @@ #include #define API_ADDR "127.0.0.1" -#define API_PORT "8080" +#define API_PORT "9080" /* Main context set as global so the signal handler can use it */ mk_ctx_t *ctx; @@ -153,9 +153,9 @@ int main() "Name", "monotop", NULL); - mk_vhost_handler(ctx, vid, "/api/v1/stream_processor/task/[A-Za-z_][0-9A-Za-z_\\-]*", + mk_vhost_handler(ctx, vid, "/api/v1/stream_processor/task/[A-Za-z_][0-9A-Za-z_\\-]*", cb_sp_test_task_detail, NULL); - + mk_vhost_handler(ctx, vid, "/api/v1/stream_processor/task", cb_sp_test_task_main, NULL); diff --git a/lib/monkey/include/monkey/mk_http_parser.h b/lib/monkey/include/monkey/mk_http_parser.h index 6d45c39414c..9e3b365eef7 100644 --- a/lib/monkey/include/monkey/mk_http_parser.h +++ b/lib/monkey/include/monkey/mk_http_parser.h @@ -45,6 +45,16 @@ #define MK_HTTP_PARSER_UPGRADE_H2 1 #define MK_HTTP_PARSER_UPGRADE_H2C 2 +/* Transfer encoding */ +#define MK_HTTP_PARSER_TRANSFER_ENCODING_NONE (0) +#define MK_HTTP_PARSER_TRANSFER_ENCODING_CHUNKED (1 << 0) +#define MK_HTTP_PARSER_TRANSFER_ENCODING_GZIP (1 << 1) + +/* Transfer encoding (almost not used) */ +#define MK_HTTP_PARSER_TRANSFER_ENCODING_COMPRESS (1 << 2) +#define MK_HTTP_PARSER_TRANSFER_ENCODING_DEFLATE (1 << 3) +#define MK_HTTP_PARSER_TRANSFER_ENCODING_IDENTITY (1 << 4) + #define MK_HEADER_EXTRA_SIZE 50 /* Request levels @@ -118,6 +128,7 @@ enum mk_request_headers { MK_HEADER_LAST_MODIFIED_SINCE , MK_HEADER_RANGE , MK_HEADER_REFERER , + MK_HEADER_TRANSFER_ENCODING , MK_HEADER_UPGRADE , MK_HEADER_USER_AGENT , MK_HEADER_SIZEOF , @@ -193,6 +204,21 @@ struct mk_http_parser { */ int header_upgrade; + + /* + * Transfer-Encoding + * ------------------ + * we support the following values (bitwise): + * + * - MK_HTTP_PARSER_TRANSFER_ENCODING_NONE + * - MK_HTTP_PARSER_TRANSFER_ENCODING_CHUNKED + * - MK_HTTP_PARSER_TRANSFER_ENCODING_GZIP + * - MK_HTTP_PARSER_TRANSFER_ENCODING_COMPRESS + * - MK_HTTP_PARSER_TRANSFER_ENCODING_DEFLATE + * - MK_HTTP_PARSER_TRANSFER_ENCODING_IDENTITY + */ + int header_transfer_encoding; + /* probable current header, fly parsing */ int header_key; int header_sep; @@ -210,6 +236,20 @@ struct mk_http_parser { /* Extra headers */ struct mk_http_header headers_extra[MK_HEADER_EXTRA_SIZE]; + + + /* + * total size of bytes received as chunked data; this don't count the + * hex strings + */ + size_t chunk_total_size_received; + + /* Transfer chunked encoding: state for active chunk being processed */ + char *chunk_expected_start; /* pointer to the expected very first chunk in the payload */ + + size_t chunk_expected_size; /* expected size of a chunk being read */ + char *chunk_processed_start; /* beginning of a chunk being read */ + char *chunk_processed_end; /* last position of a chunk that is complete */ }; @@ -333,6 +373,20 @@ static inline void mk_http_parser_init(struct mk_http_parser *p) mk_list_init(&p->header_list); } +int mk_http_parser(struct mk_http_request *req, struct mk_http_parser *p, + char *buffer, int buf_len, struct mk_server *server); + +size_t mk_http_parser_content_length(struct mk_http_parser *p); +int mk_http_parser_is_content_chunked(struct mk_http_parser *p); + +int mk_http_parser_chunked_decode(struct mk_http_parser *p, + char *buf_request, size_t buf_request_len, + char **out_buf, size_t *out_buf_size); + +int mk_http_parser_chunked_decode_buf(struct mk_http_parser *p, + char *buf_request, size_t buf_request_len, + char *out_buf, size_t out_buf_size, size_t *out_buf_len); + static inline int mk_http_parser_more(struct mk_http_parser *p, int len) { if (abs(len - p->i) - 1 > 0) { @@ -342,7 +396,34 @@ static inline int mk_http_parser_more(struct mk_http_parser *p, int len) return MK_FALSE; } -int mk_http_parser(struct mk_http_request *req, struct mk_http_parser *p, - char *buffer, int buf_len, struct mk_server *server); +/* Returns the full size of the HTTP request in bytes "If" mk_http_parser() has returned MK_HTTP_PARSER_OK */ +static inline size_t mk_http_parser_request_size(struct mk_http_parser *p, char *buf_request, size_t buf_request_len) +{ + size_t bytes; + + /* + * if the request is chunked encoded, p->i points to the beginning of the last chunk + * found, so we need to check if the last chunk is complete, if so we can return the + * size of the request + */ + if (mk_http_parser_is_content_chunked(p)) { + if (p->chunk_processed_start < buf_request) { + return -1; + } + + /* Look at the last chunk processed (0\r\n\r\n) */ + bytes = p->chunk_processed_start - buf_request + 5; + if (bytes > buf_request_len) { + return -1; + } + return bytes; + } + else if (p->header_content_length > 0) { + /* p->i points to the last byte after the content body */ + return p->i; + } + + return -1; +} #endif /* MK_HTTP_H */ diff --git a/lib/monkey/mk_core/mk_event_select.c b/lib/monkey/mk_core/mk_event_select.c index e326ba85b0b..f23dafc7352 100644 --- a/lib/monkey/mk_core/mk_event_select.c +++ b/lib/monkey/mk_core/mk_event_select.c @@ -367,7 +367,7 @@ static inline int _mk_event_inject(struct mk_event_loop *loop, if (prevent_duplication) { for (index = 0 ; index < loop->n_events ; index++) { - if (ctx->fired[index]->fd == event->fd) { + if (ctx->fired[index].data == event) { return 0; } } @@ -375,7 +375,7 @@ static inline int _mk_event_inject(struct mk_event_loop *loop, event->mask = mask; - ctx->fired[loop->n_events] = event; + ctx->fired[loop->n_events].data = event; loop->n_events++; diff --git a/lib/monkey/mk_server/mk_http.c b/lib/monkey/mk_server/mk_http.c index 1e2d219ded5..184a695a811 100644 --- a/lib/monkey/mk_server/mk_http.c +++ b/lib/monkey/mk_server/mk_http.c @@ -1150,6 +1150,7 @@ int mk_http_request_end(struct mk_http_session *cs, struct mk_server *server) mk_http_parser_init(&cs->parser); status = mk_http_parser(sr, &cs->parser, cs->body, cs->body_length, server); + if (status == MK_HTTP_PARSER_OK) { ret = mk_http_request_prepare(cs, sr, server); if (ret == MK_EXIT_ABORT) { @@ -1564,8 +1565,10 @@ int mk_http_sched_read(struct mk_sched_conn *conn, else { sr = mk_list_entry_first(&cs->request_list, struct mk_http_request, _head); } + status = mk_http_parser(sr, &cs->parser, cs->body, cs->body_length, server); + if (status == MK_HTTP_PARSER_OK) { MK_TRACE("[FD %i] HTTP_PARSER_OK", socket); if (mk_http_status_completed(cs, conn) == -1) { diff --git a/lib/monkey/mk_server/mk_http_parser.c b/lib/monkey/mk_server/mk_http_parser.c index 4e7aa31616d..f973571e9ed 100644 --- a/lib/monkey/mk_server/mk_http_parser.c +++ b/lib/monkey/mk_server/mk_http_parser.c @@ -74,8 +74,9 @@ struct row_entry mk_headers_table[] = { { 19, "last-modified-since" }, { 5, "range" }, { 7, "referer" }, + { 17, "transfer-encoding" }, { 7, "upgrade" }, - { 10, "user-agent" } + { 10, "user-agent" }, }; static inline void reverse_char_lookup(char *buf, char c, int len, struct mk_http_parser *p) @@ -226,6 +227,7 @@ static inline int header_lookup(struct mk_http_parser *p, char *buffer) p->header_count++; mk_list_add(&header->_head, &p->header_list); + if (i == MK_HEADER_HOST) { /* Handle a possible port number in the Host header */ int sep = str_searchr(header->val.data, ':', header->val.len); @@ -314,6 +316,52 @@ static inline int header_lookup(struct mk_http_parser *p, char *buffer) } } } + else if (i == MK_HEADER_TRANSFER_ENCODING) { + /* Check Transfer-Encoding: chunked */ + pos = mk_string_search_n(header->val.data, + "chunked", + MK_STR_INSENSITIVE, + header->val.len); + if (pos >= 0) { + p->header_transfer_encoding |= MK_HTTP_PARSER_TRANSFER_ENCODING_CHUNKED; + } + + /* Check Transfer-Encoding: gzip */ + pos = mk_string_search_n(header->val.data, + "gzip", + MK_STR_INSENSITIVE, + header->val.len); + if (pos >= 0) { + p->header_transfer_encoding |= MK_HTTP_PARSER_TRANSFER_ENCODING_GZIP; + } + + /* Check Transfer-Encoding: compress */ + pos = mk_string_search_n(header->val.data, + "compress", + MK_STR_INSENSITIVE, + header->val.len); + if (pos >= 0) { + p->header_transfer_encoding |= MK_HTTP_PARSER_TRANSFER_ENCODING_COMPRESS; + } + + /* Check Transfer-Encoding: deflate */ + pos = mk_string_search_n(header->val.data, + "deflate", + MK_STR_INSENSITIVE, + header->val.len); + if (pos >= 0) { + p->header_transfer_encoding |= MK_HTTP_PARSER_TRANSFER_ENCODING_DEFLATE; + } + + /* Check Transfer-Encoding: identity */ + pos = mk_string_search_n(header->val.data, + "identity", + MK_STR_INSENSITIVE, + header->val.len); + if (pos >= 0) { + p->header_transfer_encoding |= MK_HTTP_PARSER_TRANSFER_ENCODING_IDENTITY; + } + } else if (i == MK_HEADER_UPGRADE) { if (header_cmp(MK_UPGRADE_H2C, header->val.data, header->val.len) == 0) { @@ -354,6 +402,290 @@ static inline int header_lookup(struct mk_http_parser *p, char *buffer) return -MK_CLIENT_REQUEST_ENTITY_TOO_LARGE; } + +/* check if the HTTP content is chunked so it contain hexa string length headers */ +int mk_http_parser_is_content_chunked(struct mk_http_parser *p) +{ + return p->header_transfer_encoding & MK_HTTP_PARSER_TRANSFER_ENCODING_CHUNKED; +} + +size_t mk_http_parser_content_length(struct mk_http_parser *p) +{ + /* + * returns the content length of the payload. If the content-length header was + * set, it will return the value of the header. If the content-length header was + * not set and instead the transfer-encoding header was set to chunked, it will + * return the length of the payload withouto counting the chunked headers. + */ + + if (!mk_http_parser_is_content_chunked(p)) { + return p->header_content_length; + } + else { + return p->chunk_total_size_received; + } + + return 0; +} + + +int cb_debug_chunk_complete(char *in, size_t in_len, char *out, size_t out_len, size_t *out_len_processed) +{ + (void) out; + (void) out_len; + char *buf; + + /* copy the chunked content into the buffer */ + buf = mk_mem_alloc(in_len + 1); + if (!buf) { + return -1; + } + + memcpy(buf, in, in_len); + buf[in_len] = '\0'; + + printf("==CHUNK DETECTED CONTENT (length=%zu)==\n'%s'\n---\n", in_len, buf); + mk_mem_free(buf); + + *out_len_processed = in_len; + + return 0; +} + +/* + * Check if the request body is complete, incomplete or if it has an error while processing + * the chunks for a chunked transfer encoded payload + */ +static int http_parser_transfer_encoding_chunked(struct mk_http_parser *p, + char *buf_request, size_t buf_request_len, + int (*cb_chunk_complete)(char *in, size_t in_len, char *out, size_t out_len, size_t *out_len_processed), + char *out_buf, size_t out_buf_size, size_t *out_buf_len) +{ + int64_t len; + int64_t chunk_len; + int64_t pos; + char tmp[32]; + char *ptr; + char *content_start; + size_t available_bytes; + + p->level = REQ_LEVEL_BODY; + +parse_more: + + /* read the payload and check if the request has finished based on the logic of transfer encoding chunked */ + if (!p->chunk_processed_start) { + /* + * if p->chunk_processed_start is not set, it means we are parsing from the beginning. Note that + * p->chunk_expected_start is set, it means the content was already processed before, so we just + * adjust the pointer, otherwise we use the parser iterator index (p->i) for it. + */ + if (p->chunk_expected_start) { + p->chunk_processed_start = p->chunk_expected_start; + } + else { + p->chunk_processed_start = buf_request + p->i; + + /* Mark the very first chunk */ + p->chunk_expected_start = p->chunk_processed_start; + } + + len = buf_request_len - p->i; + if (len == 0) { + return MK_HTTP_PARSER_PENDING; + } + + if (p->chunk_processed_start[0] != '\n') { + return MK_HTTP_PARSER_ERROR; + } + + /* we are at the beginning of a chunk, we need to find the end */ + p->chunk_processed_start++; + len--; + + } + else { + len = buf_request_len - (p->chunk_processed_end - buf_request); + } + + /* find the end of the 'chunk header' (e.g: ffae\r\n\r\n) */ + pos = mk_string_search_n(p->chunk_processed_start, "\r\n", MK_STR_SENSITIVE, len); + if (pos < 0) { + return MK_HTTP_PARSER_PENDING; + } + + /* length of the hex string */ + len = (p->chunk_processed_start + pos) - p->chunk_processed_start; + if (((unsigned long) len > sizeof(tmp) - 1) || len == 0) { + return MK_HTTP_PARSER_ERROR; + } + + /* copy the hex string to a temporary buffer */ + memcpy(tmp, p->chunk_processed_start, len); + tmp[len] = '\0'; + + /* convert the hex string to a number */ + errno = 0; + chunk_len = strtol(tmp, &ptr, 16); + if ((errno == ERANGE && (chunk_len == LONG_MAX || chunk_len == LONG_MIN)) || + (errno != 0)) { + return MK_HTTP_PARSER_ERROR; + } + + if (chunk_len < 0) { + return MK_HTTP_PARSER_ERROR; + } + else if (chunk_len == 0) { + /* we have reached the end of the request, validate the last \r\n\r\n exists */ + len = buf_request_len - (p->chunk_processed_start - buf_request); + + if (len < 5) { + return MK_HTTP_PARSER_PENDING; + } + + /* all or nothing */ + if (strncmp(p->chunk_processed_start, "0\r\n\r\n", 5) != 0) { + return MK_HTTP_PARSER_ERROR; + } + + return MK_HTTP_PARSER_OK; + } + else { + /* set the new markers: required size and start position after the hex string length */ + p->chunk_expected_size = chunk_len; + + /* the content starts after the hex_str_length\r\n */ + content_start = p->chunk_processed_start + pos + 2; + + /* calculate the amount of available bytes 'after' content_start */ + available_bytes = buf_request_len - (content_start - buf_request); + + /* do we have all the remaining data needed in our buffer ? */ + if (available_bytes >= p->chunk_expected_size + 2 /* \r\n */) { + /* we have all the data needed */ + p->chunk_processed_end = content_start + p->chunk_expected_size; + + /* check for delimiter \r\n */ + if (p->chunk_processed_end[0] != '\r' || p->chunk_processed_end[1] != '\n') { + return MK_HTTP_PARSER_ERROR; + } + + /* + * If the callback function has been set, invoke it: this callback might be useful for + * debugging and/or provide a way to copy the chunked content into a buffer + */ + if (cb_chunk_complete) { + cb_chunk_complete(content_start, chunk_len, out_buf, out_buf_size, out_buf_len); + } + + /* set the new start for the new chunk */ + p->chunk_processed_start = p->chunk_processed_end + 2; + p->chunk_total_size_received += chunk_len; + goto parse_more; + } + else { + /* we need more data */ + return MK_HTTP_PARSER_PENDING; + } + + } + /* is our chunk complete ? */ + return MK_HTTP_PARSER_PENDING; + +} + +/* Read the chunked content and invoke callback if it has been set */ +int mk_http_parser_read_chunked_content(struct mk_http_parser *p, + char *buf_request, size_t buf_request_len, + int (*cb_chunk_complete)(char *in, size_t in_len, char *out, size_t out_size, size_t *out_len), + char *out_buf, size_t out_buf_size, size_t *out_buf_len) +{ + p->chunk_processed_start = NULL; + p->chunk_processed_end = NULL; + + return http_parser_transfer_encoding_chunked(p, + buf_request, buf_request_len, + cb_chunk_complete, + out_buf, out_buf_size, out_buf_len); +} + +/* + * Callback function used by mk_http_parser_chunked_decode to provide a new buffer with the content + * of the payload decoded + */ +static int cb_copy_chunk(char *in, size_t in_len, char *out, size_t out_size, size_t *out_len_processed) +{ + (void) out_size; + + /* check we don't overflow the buffer */ + if (*out_len_processed + in_len > out_size) { + return -1; + } + + /* copy the chunk */ + memcpy(out + *out_len_processed, in, in_len); + *out_len_processed += in_len; + + return 0; +} + +/* + * This function assumes that the output buffer size has enough space to copy the desired + * chunked content. We do some sanity checks but if the buffer is smaller the data will + * be truncated. + */ +int mk_http_parser_chunked_decode_buf(struct mk_http_parser *p, + char *buf_request, size_t buf_request_len, + char *out_buf, size_t out_buf_size, size_t *out_buf_len) +{ + int ret; + size_t written_bytes = 0; + + ret = mk_http_parser_read_chunked_content(p, + buf_request, buf_request_len, + cb_copy_chunk, + out_buf, out_buf_size, &written_bytes); + if (ret == MK_HTTP_PARSER_OK) { + *out_buf_len = written_bytes; + return 0; + } + + return -1; +} + +int mk_http_parser_chunked_decode(struct mk_http_parser *p, + char *buf_request, size_t buf_request_len, + char **out_buf, size_t *out_buf_size) +{ + int ret; + char *tmp_buf; + size_t tmp_buf_size = 0; + size_t tmp_written_bytes = 0; + + tmp_buf_size = mk_http_parser_content_length(p); + if (tmp_buf_size == 0) { + return -1; + } + + tmp_buf = mk_mem_alloc(tmp_buf_size); + if (!tmp_buf) { + return -1; + } + + ret = mk_http_parser_chunked_decode_buf(p, + buf_request, buf_request_len, + tmp_buf, tmp_buf_size, &tmp_written_bytes); + if (ret == -1) { + mk_mem_free(tmp_buf); + return -1; + } + + *out_buf = tmp_buf; + *out_buf_size = tmp_written_bytes; + + return 0; +} + /* * This function is invoked everytime the parser evaluate the request is * OK. Here we perform some extra validations mostly based on some logic @@ -361,8 +693,11 @@ static inline int header_lookup(struct mk_http_parser *p, char *buffer) */ static inline int mk_http_parser_ok(struct mk_http_request *req, struct mk_http_parser *p, + char *buf_request, size_t buf_request_len, struct mk_server *server) { + int ret; + /* Validate HTTP Version */ if (req->protocol == MK_HTTP_PROTOCOL_UNKNOWN) { mk_http_error(MK_SERVER_HTTP_VERSION_UNSUP, req->session, req, server); @@ -371,10 +706,20 @@ static inline int mk_http_parser_ok(struct mk_http_request *req, /* POST checks */ if (req->method == MK_METHOD_POST || req->method == MK_METHOD_PUT) { - /* validate Content-Length exists */ - if (p->headers[MK_HEADER_CONTENT_LENGTH].type == 0) { - mk_http_error(MK_CLIENT_LENGTH_REQUIRED, req->session, req, server); - return MK_HTTP_PARSER_ERROR; + /* validate Content-Length exists for non-chunked requests */ + if (mk_http_parser_is_content_chunked(p)) { + p->level = REQ_LEVEL_BODY; + + ret = http_parser_transfer_encoding_chunked(p, + buf_request, buf_request_len, + NULL, NULL, 0, NULL); + return ret; + } + else { + if (p->headers[MK_HEADER_CONTENT_LENGTH].type == 0) { + mk_http_error(MK_CLIENT_LENGTH_REQUIRED, req->session, req, server); + return MK_HTTP_PARSER_ERROR; + } } } @@ -543,7 +888,7 @@ int mk_http_parser(struct mk_http_request *req, struct mk_http_parser *p, break; case MK_ST_BLOCK_END: if (buffer[p->i] == '\n') { - return mk_http_parser_ok(req, p, server); + return mk_http_parser_ok(req, p, buffer, buf_len, server); } else { return MK_HTTP_PARSER_ERROR; @@ -614,6 +959,9 @@ int mk_http_parser(struct mk_http_request *req, struct mk_http_parser *p, p->header_min = MK_HEADER_UPGRADE; p->header_max = MK_HEADER_USER_AGENT; break; + case 't': + header_scope_eq(p, MK_HEADER_TRANSFER_ENCODING); + break; default: p->header_key = -1; p->header_sep = -1; @@ -710,7 +1058,7 @@ int mk_http_parser(struct mk_http_request *req, struct mk_http_parser *p, start_next(); } else { - return mk_http_parser_ok(req, p, server); + return mk_http_parser_ok(req, p, buffer, buf_len, server); } } else { @@ -736,7 +1084,7 @@ int mk_http_parser(struct mk_http_request *req, struct mk_http_parser *p, req->data.len = p->body_received; req->data.data = (buffer + p->start); } - return mk_http_parser_ok(req, p, server); + return mk_http_parser_ok(req, p, buffer, buf_len, server); } } diff --git a/lib/monkey/plugins/logger/logger.c b/lib/monkey/plugins/logger/logger.c index 4dfcfb00192..271b39e1e9b 100644 --- a/lib/monkey/plugins/logger/logger.c +++ b/lib/monkey/plugins/logger/logger.c @@ -197,7 +197,7 @@ static void mk_logger_start_worker(void *args) timeout = clk + mk_logger_timeout; - flog = open(target, O_WRONLY | O_CREAT | O_CLOEXEC, 0644); + flog = open(target, O_WRONLY | O_CREAT | O_CLOEXEC, 0600); if (mk_unlikely(flog == -1)) { mk_warn("Could not open logfile '%s' (%s)", target, strerror(errno)); @@ -327,7 +327,7 @@ int mk_logger_plugin_init(struct plugin_api **api, char *confdir) /* Check masterlog */ if (mk_logger_master_path) { - fd = open(mk_logger_master_path, O_WRONLY | O_CREAT | O_CLOEXEC, 0644); + fd = open(mk_logger_master_path, O_WRONLY | O_CREAT | O_CLOEXEC, 0600); if (fd == -1) { mk_err("Could not open/create master logfile %s", mk_logger_master_path); exit(EXIT_FAILURE);