diff --git a/plugins/in_tail/tail.c b/plugins/in_tail/tail.c index 37b1f4f6c68..52bf2ed6d40 100644 --- a/plugins/in_tail/tail.c +++ b/plugins/in_tail/tail.c @@ -734,6 +734,14 @@ static struct flb_config_map config_map[] = { "provides higher performance. Note that WAL is not compatible with " "shared network file systems." }, + { + FLB_CONFIG_MAP_BOOL, "db.compare_filename", "false", + 0, FLB_TRUE, offsetof(struct flb_tail_config, compare_filename), + "This option determines whether to check both the inode and the filename " + "when retrieving file information from the db." + "'true' verifies both the inode and filename, while 'false' checks only " + "the inode (default)." + }, #endif /* Multiline Options */ diff --git a/plugins/in_tail/tail_config.h b/plugins/in_tail/tail_config.h index dcfa54e0264..c0263b46503 100644 --- a/plugins/in_tail/tail_config.h +++ b/plugins/in_tail/tail_config.h @@ -107,6 +107,7 @@ struct flb_tail_config { struct flb_sqldb *db; int db_sync; int db_locking; + int compare_filename; flb_sds_t db_journal_mode; sqlite3_stmt *stmt_get_file; sqlite3_stmt *stmt_insert_file; diff --git a/plugins/in_tail/tail_db.c b/plugins/in_tail/tail_db.c index 99242f8a15b..ab05d5b9323 100644 --- a/plugins/in_tail/tail_db.c +++ b/plugins/in_tail/tail_db.c @@ -95,9 +95,33 @@ int flb_tail_db_close(struct flb_sqldb *db) return 0; } +static int flb_tail_db_file_delete_by_id(struct flb_tail_config *ctx, + uint64_t id) +{ + int ret; + + /* Bind parameters */ + sqlite3_bind_int64(ctx->stmt_delete_file, 1, id); + ret = sqlite3_step(ctx->stmt_delete_file); + + sqlite3_clear_bindings(ctx->stmt_delete_file); + sqlite3_reset(ctx->stmt_delete_file); + + if (ret != SQLITE_DONE) { + flb_plg_error(ctx->ins, "db: error deleting stale entry from database:" + " id=%"PRIu64, id); + return -1; + } + + flb_plg_info(ctx->ins, "db: stale file deleted from database:" + " id=%"PRIu64, id); + return 0; +} + /* - * Check if an file inode exists in the database. Return FLB_TRUE or - * FLB_FALSE + * Check if an file inode exists in the database. + * If the 'compare_filename' option is enabled, + * it checks along with the filename. Return FLB_TRUE or FLB_FALSE */ static int db_file_exists(struct flb_tail_file *file, struct flb_tail_config *ctx, @@ -105,6 +129,7 @@ static int db_file_exists(struct flb_tail_file *file, { int ret; int exists = FLB_FALSE; + const unsigned char *name; /* Bind parameters */ sqlite3_bind_int64(ctx->stmt_get_file, 1, file->inode); @@ -116,11 +141,26 @@ static int db_file_exists(struct flb_tail_file *file, /* id: column 0 */ *id = sqlite3_column_int64(ctx->stmt_get_file, 0); + /* name: column 1 */ + name = sqlite3_column_text(ctx->stmt_get_file, 1); + /* offset: column 2 */ *offset = sqlite3_column_int64(ctx->stmt_get_file, 2); /* inode: column 3 */ *inode = sqlite3_column_int64(ctx->stmt_get_file, 3); + + /* Checking if the file's name and inode match exactly */ + if (ctx->compare_filename) { + if (flb_tail_target_file_name_cmp((char *) name, file) != 0) { + exists = FLB_FALSE; + flb_plg_info(ctx->ins, "db: exists stale file from database:" + " id=%"PRIu64" inode=%"PRIu64" offset=%"PRIu64 + " name=%s file_inode=%"PRIu64" file_name=%s", + *id, *inode, *offset, name, file->inode, + file->name); + } + } } else if (ret == SQLITE_DONE) { /* all good */ @@ -221,6 +261,11 @@ int flb_tail_db_file_set(struct flb_tail_file *file, } if (ret == FLB_FALSE) { + /* Delete stale file of same inode */ + if (ctx->compare_filename && id > 0) { + flb_tail_db_file_delete_by_id(ctx, id); + } + /* Get the database ID for this file */ file->db_id = db_file_insert(file, ctx); } diff --git a/tests/runtime/in_tail.c b/tests/runtime/in_tail.c index 74accb66ed6..90d8832bc79 100644 --- a/tests/runtime/in_tail.c +++ b/tests/runtime/in_tail.c @@ -1733,6 +1733,153 @@ void flb_test_db_delete_stale_file() test_tail_ctx_destroy(ctx); unlink(db); } + +void flb_test_db_compare_filename() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *org_file[] = {"test_db.log"}; + char *moved_file[] = {"test_db_moved.log"}; + char *db = "test_db.db"; + char *msg_init = "hello world"; + char *msg_moved = "hello world moved"; + char *msg_end = "hello db end"; + int i; + int ret; + int num; + int unused; + + unlink(db); + + clear_output_num(); + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + ctx = test_tail_ctx_create(&cb_data, + &org_file[0], + sizeof(org_file)/sizeof(char *), + FLB_FALSE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", org_file[0], + "read_from_head", "true", + "db", db, + "db.sync", "full", + "db.compare_filename", "true", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ret = write_msg(ctx, msg_init, strlen(msg_init)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + unlink(db); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no output"); + } + + if (ctx->fds != NULL) { + for (i=0; ifd_num; i++) { + close(ctx->fds[i]); + } + flb_free(ctx->fds); + } + flb_stop(ctx->flb); + flb_destroy(ctx->flb); + flb_free(ctx); + + /* re-init to use db */ + clear_output_num(); + + /* + * Changing the file name from 'test_db.log' to 'test_db_moved.log.' + * In this scenario, it is assumed that the FluentBit has been terminated, + * and the file has been recreated with the same inode, with offsets equal + * to or greater than the previous file. + */ + ret = rename(org_file[0], moved_file[0]); + TEST_CHECK(ret == 0); + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + ctx = test_tail_ctx_create(&cb_data, + &moved_file[0], + sizeof(moved_file)/sizeof(char *), + FLB_FALSE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + unlink(db); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", moved_file[0], + "read_from_head", "true", + "db", db, + "db.sync", "full", + "db.compare_filename", "true", + NULL); + TEST_CHECK(ret == 0); + + /* + * Start the engine + * The file has been newly created, and due to the 'db.compare_filename' + * option being set to true, it compares filenames to consider it a new + * file even if the inode is the same. If the option is set to false, + * it can be assumed to be the same file as before. + */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* waiting to flush */ + flb_time_msleep(500); + + ret = write_msg(ctx, msg_moved, strlen(msg_moved)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + unlink(db); + exit(EXIT_FAILURE); + } + + ret = write_msg(ctx, msg_end, strlen(msg_end)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + unlink(db); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num == 3)) { + /* 3 = msg_init + msg_moved + msg_end */ + TEST_MSG("num error. expect=3 got=%d", num); + } + + test_tail_ctx_destroy(ctx); + unlink(db); +} #endif /* FLB_HAVE_SQLDB */ /* Test list */ @@ -1758,6 +1905,7 @@ TEST_LIST = { #ifdef FLB_HAVE_SQLDB {"db", flb_test_db}, {"db_delete_stale_file", flb_test_db_delete_stale_file}, + {"db_compare_filename", flb_test_db_compare_filename}, #endif #ifdef in_tail