Skip to content

Commit

Permalink
Receive the feed by chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
jjnicola committed Oct 8, 2024
1 parent d5d1f2a commit e489632
Showing 1 changed file with 219 additions and 37 deletions.
256 changes: 219 additions & 37 deletions src/manage_sql_nvts.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
/**
* @brief Enable extra GNU functions.
*/
#define _GNU_SOURCE /* See feature_test_macros(7) */
#define _FILE_OFFSET_BITS 64
#include <stdio.h>

#include <gvm/base/nvti.h>
#include "glibconfig.h"
Expand Down Expand Up @@ -2147,6 +2150,128 @@ update_nvts_from_vts (element_t *get_vts_response,
return 0;
}



struct FILESTREAM {
char *stream_buffer;
size_t size_of_buffer;
size_t last_read;
size_t last_write;
};

static ssize_t
read_with_error_on_eof (void *stream_cookie, char *buf, size_t size)
{
struct FILESTREAM *stream = stream_cookie;
size_t to_read = stream->last_write - stream->last_read;
if (to_read < 0)
to_read = 0;

if (to_read > size)
to_read = size;
memcpy (buf, &stream->stream_buffer[stream->last_read], to_read);

stream->last_read += to_read;
return to_read;
}


static int
closecookie(void *filestream)
{
struct FILESTREAM *stream = filestream;
g_free(stream->stream_buffer);
stream->size_of_buffer = 0;
stream->stream_buffer = NULL;
return 0;
}

static ssize_t
write_with_error_on_eof (void *stream_cookie, const char *buf, size_t size)
{
struct FILESTREAM *stream = stream_cookie;
size_t next_size = stream->last_write + size;
if (next_size > stream->size_of_buffer)
{
stream->size_of_buffer = next_size + GVM_JSON_PULL_PARSE_BUFFER_LIMIT;
stream->stream_buffer = g_realloc (stream->stream_buffer,
stream->size_of_buffer);
if (stream->stream_buffer == NULL)
{
g_message ("%s: Buffer overflow", __func__);
return 0;
}
}

memcpy (&(stream->stream_buffer[stream->last_write]), buf, size);
stream->last_write+=size;

return size;
}


/**
* @brief Reduce the buffer size, removing the already read
* chars and reallocating
* to the new size, releasing memory
*/
static int
modify_buffer_size (struct FILESTREAM *filestream)
{
size_t old_buffer_size = filestream->size_of_buffer;
size_t non_read_chars_count = filestream->last_write - filestream->last_read;

if (non_read_chars_count < filestream->size_of_buffer - 2 * GVM_JSON_PULL_PARSE_BUFFER_LIMIT
&& non_read_chars_count > GVM_JSON_PULL_PARSE_BUFFER_LIMIT * 2)
{
char *auxbuf;

filestream->size_of_buffer =
filestream->size_of_buffer - GVM_JSON_PULL_PARSE_BUFFER_LIMIT;

auxbuf = g_malloc0 (sizeof(char) * old_buffer_size);
if (auxbuf == NULL)
return -1;

memcpy (auxbuf, &filestream->stream_buffer[filestream->last_read],
non_read_chars_count);
g_free(filestream->stream_buffer);
filestream->stream_buffer =
g_malloc0 (sizeof(char) * filestream->size_of_buffer);
memcpy (filestream->stream_buffer, auxbuf, non_read_chars_count);

filestream->last_read = 0;
filestream->last_write = non_read_chars_count;

g_free(auxbuf);
}
return 0;
}

/**
* @brief Move non read data to beggining of the buffer
*/
static int move_buffer_data(struct FILESTREAM *filestream){
char *auxbuf;
size_t non_read_chars_count = filestream->last_write - filestream->last_read;

auxbuf = g_malloc0 (sizeof(char) * filestream->size_of_buffer);
if (auxbuf == NULL)
return -1;

memcpy (auxbuf, &filestream->stream_buffer[filestream->last_read],
non_read_chars_count);
memset (filestream->stream_buffer, '\0', filestream->size_of_buffer);
memcpy (filestream->stream_buffer, auxbuf, non_read_chars_count);

filestream->last_read = 0;
filestream->last_write = non_read_chars_count;

g_free(auxbuf);

return 0;
}

/**
* @brief Update NVTs from Json response
*
Expand All @@ -2157,8 +2282,7 @@ update_nvts_from_vts (element_t *get_vts_response,
* @return 0 success, 1 VT integrity check failed, -1 error
*/
static int
update_nvts_from_json_vts (gvm_json_pull_parser_t *parser,
gvm_json_pull_event_t *event,
update_nvts_from_json_vts (curlm_t curl_hnd, stringstream res,
const gchar *scanner_feed_version,
int rebuild)
{
Expand Down Expand Up @@ -2206,35 +2330,97 @@ update_nvts_from_json_vts (gvm_json_pull_parser_t *parser,
vt_refs_batch = batch_start (vt_ref_insert_size);
vt_sevs_batch = batch_start (vt_sev_insert_size);

nvti_t *nvti = openvasd_parse_vt (parser,event);
while (nvti)
{
if (nvti == NULL)
continue;
int running = 0;
gvm_json_pull_event_t event;
gvm_json_pull_parser_t parser;
FILE *stream = NULL;
struct FILESTREAM *filestream;
nvti_t *nvti = NULL;

cookie_io_functions_t cookiehooks = {
.read = read_with_error_on_eof,
.write = write_with_error_on_eof,
.seek = NULL,
.close = closecookie,
};

filestream = g_malloc0 (sizeof(struct FILESTREAM));
filestream->size_of_buffer = GVM_JSON_PULL_PARSE_BUFFER_LIMIT;
filestream->stream_buffer =
g_malloc0 (sizeof(char) * filestream->size_of_buffer);

stream = fopencookie (filestream, "a+", cookiehooks);

gvm_json_pull_parser_init_full (&parser, stream,
GVM_JSON_PULL_PARSE_BUFFER_LIMIT,
GVM_JSON_PULL_READ_BUFFER_SIZE * 8);
gvm_json_pull_event_init (&event);

if (nvti_creation_time (nvti) > feed_version_epoch)
count_new_vts += 1;
else
count_modified_vts += 1;
// First run for initial data in the stream
running = openvasd_get_vts_stream (curl_hnd);
fwrite (res.ptr, 1, res.len, stream);
g_free (res.ptr);
init_stringstream (&res);

insert_nvt (nvti, rebuild, vt_refs_batch, vt_sevs_batch);
while (running)
{
size_t non_read_count = 0;
// Ensure a big chunk of data.
// Realloc is expensive therefore we realloc with bigger chuncks
while (running > 0 && res.len < GVM_JSON_PULL_READ_BUFFER_SIZE * 8)
running = openvasd_get_vts_stream (curl_hnd);

preferences = NULL;
if (update_preferences_from_json_nvt (nvti, &preferences))
if (res.len > 0)
{
sql_rollback ();
return -1;
move_buffer_data (filestream);
fwrite (res.ptr, 1, res.len, stream);
g_free (res.ptr);
init_stringstream (&res);
}
if (rebuild == 0)
sql ("DELETE FROM nvt_preferences%s WHERE name LIKE '%s:%%';",
rebuild ? "_rebuild" : "",
nvti_oid (nvti));
insert_nvt_preferences_list (preferences, rebuild);
g_list_free_full (preferences, (GDestroyNotify) preference_free);

nvti_free (nvti);
nvti = openvasd_parse_vt (parser, event);
non_read_count = filestream->last_write - filestream->last_read;
while (running && non_read_count > GVM_JSON_PULL_READ_BUFFER_SIZE * 8)
{
int nvti_count = 0;
if (nvti_count == 20)
{
move_buffer_data (filestream);
modify_buffer_size (filestream);
nvti_count = 0;
}

nvti = openvasd_parse_vt (&parser, &event);
if (nvti == NULL)
break;

if (nvti_creation_time (nvti) > feed_version_epoch)
count_new_vts += 1;
else
count_modified_vts += 1;

insert_nvt (nvti, rebuild, vt_refs_batch, vt_sevs_batch);

preferences = NULL;
if (update_preferences_from_json_nvt (nvti, &preferences))
{
sql_rollback ();
return -1;
}
if (rebuild == 0)
sql ("DELETE FROM nvt_preferences%s WHERE name LIKE '%s:%%';",
rebuild ? "_rebuild" : "",
nvti_oid (nvti));
insert_nvt_preferences_list (preferences, rebuild);
g_list_free_full (preferences, (GDestroyNotify) preference_free);

g_free(nvti);
non_read_count = filestream->last_write - filestream->last_read;
}
}
gvm_json_pull_event_cleanup(&event);
gvm_json_pull_parser_cleanup(&parser);
fclose(stream);


batch_end (vt_refs_batch);
batch_end (vt_sevs_batch);
Expand Down Expand Up @@ -2662,25 +2848,21 @@ update_nvt_cache_openvasd (gchar* openvasd_uuid, gchar *db_feed_version,
return -1;
}

resp = openvasd_get_vts (&connector);
if (resp->code != 200)
curlm_t curl_hnd = NULL;
stringstream res;

init_stringstream (&res);
resp = openvasd_get_vts_stream_init(&connector, &curl_hnd, &res);
if (resp->code < 0)
{
g_warning ("%s: failed to get VTs", __func__);
return -1;
}

FILE *stream;
gvm_json_pull_event_t event;
gvm_json_pull_parser_t parser;

stream = fmemopen (resp->body, strlen (resp->body), "r");
gvm_json_pull_parser_init (&parser, stream);
gvm_json_pull_event_init (&event);
ret = update_nvts_from_json_vts (&parser, &event, scanner_feed_version,
ret = update_nvts_from_json_vts (curl_hnd, res, scanner_feed_version,
rebuild);
fclose(stream);
gvm_json_pull_parser_cleanup (&parser);

g_free (res.ptr);
openvasd_curl_handler_close (&curl_hnd);
openvasd_response_free (resp);

if (ret)
Expand Down

0 comments on commit e489632

Please sign in to comment.