From 8dc76c12400b0ffeab3b505f0d89a6d456ab0531 Mon Sep 17 00:00:00 2001 From: Juan Jose Nicola Date: Tue, 8 Oct 2024 14:17:19 -0300 Subject: [PATCH] Receive the feed by chunks --- src/manage_sql_nvts.c | 228 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 189 insertions(+), 39 deletions(-) diff --git a/src/manage_sql_nvts.c b/src/manage_sql_nvts.c index f0de40f62..3b9c01ec9 100644 --- a/src/manage_sql_nvts.c +++ b/src/manage_sql_nvts.c @@ -26,6 +26,9 @@ /** * @brief Enable extra GNU functions. */ +#define _GNU_SOURCE /* See feature_test_macros(7) */ +#define _FILE_OFFSET_BITS 64 +#include #include #include "glibconfig.h" @@ -2147,18 +2150,111 @@ update_nvts_from_vts (element_t *get_vts_response, return 0; } + +/** + * @brief Struct containing the stream buffer. + */ +struct FILESTREAM { + char *stream_buffer; + size_t size_of_buffer; + size_t last_read; + size_t last_write; +}; + +/** + * @brief Hook function to read the stream file cookie + */ +static ssize_t +readcookie (void *stream_cookie, char *buf, size_t size) +{ + struct FILESTREAM *stream = stream_cookie; + size_t to_read = stream->last_write - stream->last_read; + if (to_read < 0) + to_read = 0; + + if (to_read > size) + to_read = size; + memcpy (buf, &stream->stream_buffer[stream->last_read], to_read); + + stream->last_read += to_read; + return to_read; +} + /** - * @brief Update NVTs from Json response + * @brief Hook function to close the stream file cookie + */ +static int +closecookie(void *filestream) +{ + struct FILESTREAM *stream = filestream; + g_free(stream->stream_buffer); + stream->size_of_buffer = 0; + stream->stream_buffer = NULL; + return 0; +} + +/** + * @brief Hook function to write the stream file cookie + */ +static ssize_t +writecookie (void *stream_cookie, const char *buf, size_t size) +{ + struct FILESTREAM *stream = stream_cookie; + size_t next_size = stream->last_write + size; + if (next_size > stream->size_of_buffer) + { + stream->size_of_buffer = next_size + GVM_JSON_PULL_PARSE_BUFFER_LIMIT; + stream->stream_buffer = g_realloc (stream->stream_buffer, + stream->size_of_buffer); + if (stream->stream_buffer == NULL) + { + g_message ("%s: Buffer overflow", __func__); + return 0; + } + } + + memcpy (&(stream->stream_buffer[stream->last_write]), buf, size); + stream->last_write+=size; + + return size; +} + +/** + * @brief Move non read data to beggining of the buffer + */ +static int move_buffer_data(struct FILESTREAM *filestream){ + char *auxbuf; + size_t non_read_chars_count = filestream->last_write - filestream->last_read; + + auxbuf = g_malloc0 (sizeof(char) * filestream->size_of_buffer); + if (auxbuf == NULL) + return -1; + + memcpy (auxbuf, &filestream->stream_buffer[filestream->last_read], + non_read_chars_count); + memset (filestream->stream_buffer, '\0', filestream->size_of_buffer); + memcpy (filestream->stream_buffer, auxbuf, non_read_chars_count); + + filestream->last_read = 0; + filestream->last_write = non_read_chars_count; + + g_free(auxbuf); + + return 0; +} + +/** + * @brief Update NVTs from Json response chunk by chunk * - * @param[in] get_vts_response Openvasd VTS response. + * @param[in] curl_hnd Curl handler to perform the request + * @param[in] res Struct containing the response chunks * @param[in] scanner_feed_version Version of feed from scanner. * @param[in] rebuild Whether we're rebuilding the tables. * * @return 0 success, 1 VT integrity check failed, -1 error */ static int -update_nvts_from_json_vts (gvm_json_pull_parser_t *parser, - gvm_json_pull_event_t *event, +update_nvts_from_json_vts (curlm_t curl_hnd, stringstream res, const gchar *scanner_feed_version, int rebuild) { @@ -2206,36 +2302,94 @@ update_nvts_from_json_vts (gvm_json_pull_parser_t *parser, vt_refs_batch = batch_start (vt_ref_insert_size); vt_sevs_batch = batch_start (vt_sev_insert_size); - nvti_t *nvti = openvasd_parse_vt (parser,event); - while (nvti) - { - if (nvti == NULL) - continue; + int running = 0; + gvm_json_pull_event_t event; + gvm_json_pull_parser_t parser; + FILE *stream = NULL; + struct FILESTREAM *filestream; + nvti_t *nvti = NULL; + + cookie_io_functions_t cookiehooks = { + .read = readcookie, + .write = writecookie, + .seek = NULL, + .close = closecookie, + }; + + filestream = g_malloc0 (sizeof(struct FILESTREAM)); + filestream->size_of_buffer = GVM_JSON_PULL_PARSE_BUFFER_LIMIT; + filestream->stream_buffer = + g_malloc0 (sizeof(char) * filestream->size_of_buffer); + + stream = fopencookie (filestream, "a+", cookiehooks); + + gvm_json_pull_parser_init_full (&parser, stream, + GVM_JSON_PULL_PARSE_BUFFER_LIMIT, + GVM_JSON_PULL_READ_BUFFER_SIZE * 8); + gvm_json_pull_event_init (&event); - if (nvti_creation_time (nvti) > feed_version_epoch) - count_new_vts += 1; - else - count_modified_vts += 1; + // First run for initial data in the stream + running = openvasd_get_vts_stream (curl_hnd); + fwrite (res.ptr, 1, res.len, stream); + g_free (res.ptr); + init_stringstream (&res); - insert_nvt (nvti, rebuild, vt_refs_batch, vt_sevs_batch); + while (running) + { + size_t non_read_count = 0; + // Ensure a big chunk of data. + // Realloc is expensive therefore we realloc with bigger chuncks + while (running && res.len < GVM_JSON_PULL_READ_BUFFER_SIZE * 8) + running = openvasd_get_vts_stream (curl_hnd); - preferences = NULL; - if (update_preferences_from_json_nvt (nvti, &preferences)) + if (res.len > 0) { - sql_rollback (); - return -1; + move_buffer_data (filestream); + fwrite (res.ptr, 1, res.len, stream); + g_free (res.ptr); + init_stringstream (&res); } - if (rebuild == 0) - sql ("DELETE FROM nvt_preferences%s WHERE name LIKE '%s:%%';", - rebuild ? "_rebuild" : "", - nvti_oid (nvti)); - insert_nvt_preferences_list (preferences, rebuild); - g_list_free_full (preferences, (GDestroyNotify) preference_free); - nvti_free (nvti); - nvti = openvasd_parse_vt (parser, event); + non_read_count = filestream->last_write - filestream->last_read; + // While streaming, parse some VTs and continue for a new chunk. + // If the stream is not running anymore, parse the remaining VTs. + while ((running && non_read_count > GVM_JSON_PULL_READ_BUFFER_SIZE * 8) || !running) + { + if (event.type != GVM_JSON_PULL_EVENT_EOF) + nvti = openvasd_parse_vt (&parser, &event); + if (nvti == NULL) + break; + + if (nvti_creation_time (nvti) > feed_version_epoch) + count_new_vts += 1; + else + count_modified_vts += 1; + + insert_nvt (nvti, rebuild, vt_refs_batch, vt_sevs_batch); + + preferences = NULL; + if (update_preferences_from_json_nvt (nvti, &preferences)) + { + sql_rollback (); + return -1; + } + if (rebuild == 0) + sql ("DELETE FROM nvt_preferences%s WHERE name LIKE '%s:%%';", + rebuild ? "_rebuild" : "", + nvti_oid (nvti)); + insert_nvt_preferences_list (preferences, rebuild); + g_list_free_full (preferences, (GDestroyNotify) preference_free); + + g_free(nvti); + non_read_count = filestream->last_write - filestream->last_read; + } } + gvm_json_pull_event_cleanup(&event); + gvm_json_pull_parser_cleanup(&parser); + fclose(stream); + + batch_end (vt_refs_batch); batch_end (vt_sevs_batch); @@ -2662,25 +2816,21 @@ update_nvt_cache_openvasd (gchar* openvasd_uuid, gchar *db_feed_version, return -1; } - resp = openvasd_get_vts (&connector); - if (resp->code != 200) + curlm_t curl_hnd = NULL; + stringstream res; + + init_stringstream (&res); + resp = openvasd_get_vts_stream_init(&connector, &curl_hnd, &res); + if (resp->code < 0) { g_warning ("%s: failed to get VTs", __func__); return -1; } - - FILE *stream; - gvm_json_pull_event_t event; - gvm_json_pull_parser_t parser; - - stream = fmemopen (resp->body, strlen (resp->body), "r"); - gvm_json_pull_parser_init (&parser, stream); - gvm_json_pull_event_init (&event); - ret = update_nvts_from_json_vts (&parser, &event, scanner_feed_version, + ret = update_nvts_from_json_vts (curl_hnd, res, scanner_feed_version, rebuild); - fclose(stream); - gvm_json_pull_parser_cleanup (&parser); + g_free (res.ptr); + openvasd_curl_handler_close (&curl_hnd); openvasd_response_free (resp); if (ret)