diff --git a/plugins/in_blob/blob.c b/plugins/in_blob/blob.c index c4f5175425d..8081a9a42fe 100644 --- a/plugins/in_blob/blob.c +++ b/plugins/in_blob/blob.c @@ -28,10 +28,13 @@ #include #include +#include #ifndef FLB_SYSTEM_WINDOWS #include #include +#else +#define strtok_r strtok_s #endif #include @@ -190,6 +193,128 @@ static inline int do_glob(const char *pattern, return ret; } +#define MATCHING_METHOD_PRESENCE 0 +#define MATCHING_METHOD_EXACT 1 + +static int apply_glob_pattern(flb_sds_t pattern, char *path) { + int matching_method; + char *strtok_context; + cfl_sds_t local_pattern; + char *filename_part; + char *pattern_part; + char *filename; + int result; + size_t index; + + if (pattern == NULL || pattern[0] == '\0') { + return FLB_FALSE; + } + + if (path == NULL || path[0] == '\0') { + return FLB_FALSE; + } + + filename = NULL; + +#ifdef FLB_SYSTEM_WINDOWS + filename = strrchr(path, '\\'); +#endif + + if (filename == NULL) { + filename = strrchr(path, '/'); + } + + if (filename != NULL) { + filename = &filename[1]; + } + else { + filename = path; + } + + if (filename[0] == '\0') { + return FLB_FALSE; + } + + local_pattern = cfl_sds_create(pattern); + + if (local_pattern == NULL) { + return FLB_FALSE; + } + + result = FLB_FALSE; + + if (strrchr(local_pattern, '*') != NULL) { + index = 0; + pattern_part = strtok_r(local_pattern, "*", &strtok_context); + + if (local_pattern[0] == '*') { + matching_method = MATCHING_METHOD_PRESENCE; + } + else { + matching_method = MATCHING_METHOD_EXACT; + } + + filename_part = filename; + + + while (pattern_part != NULL && filename_part != NULL) { + if (matching_method == MATCHING_METHOD_PRESENCE) { + filename_part = strstr(filename_part, pattern_part); + } + else { + result = strncmp(filename_part, pattern_part, strlen(pattern_part)); + + if (result != 0) { + filename_part = NULL; + } + + matching_method = MATCHING_METHOD_PRESENCE; + } + + if (filename_part != NULL) { + filename_part = &filename_part[strlen(pattern_part)]; + } + + index++; + pattern_part = strtok_r(NULL, "*", &strtok_context); + } + + result = FLB_FALSE; + + if (pattern_part == NULL) { + if (pattern[strlen(pattern) - 1] != '*') { + if (filename_part != NULL) { + if (filename_part[0] == '\0') { + result = FLB_TRUE; + } + } + } + else { + if (filename_part != NULL) { + result = FLB_TRUE; + } + } + } + else { + result = FLB_FALSE; + } + } + else { + result = strcmp(filename, local_pattern); + + if (result == 0) { + result = FLB_TRUE; + } + else { + result = FLB_FALSE; + } + } + + cfl_sds_destroy(local_pattern); + + return result; +} + /* This function recursively searches a directory tree using * a glob compatible pattern that implements the fluentd style * recursion wildcard **. @@ -290,11 +415,20 @@ static ssize_t recursive_file_search(struct blob_ctx *ctx, else if (cfl_sds_len(local_pattern) == 0) { recursive_search_flag = FLB_FALSE; } + else if (cfl_sds_len(local_path) == 0) { + recursive_search_flag = FLB_FALSE; + } memset(&glob_context, 0, sizeof(glob_t)); /* Scan the given path */ - result = do_glob(local_path, GLOB_TILDE | GLOB_ERR, NULL, &glob_context); + if (local_path[0] != '\0') { + result = do_glob(local_path, GLOB_TILDE | GLOB_ERR, NULL, &glob_context); + } + else { + result = do_glob(local_pattern, GLOB_TILDE | GLOB_ERR, NULL, &glob_context); + } + if (result != 0) { switch (result) { case GLOB_NOSPACE: @@ -411,23 +545,29 @@ static ssize_t recursive_file_search(struct blob_ctx *ctx, else if (recursive_search_flag == FLB_FALSE && (S_ISREG(fs_entry_metadata.st_mode) || S_ISLNK(fs_entry_metadata.st_mode))) { - result = blob_file_append(ctx, - glob_context.gl_pathv[index], - &fs_entry_metadata); - if (result == 0) { - flb_plg_debug(ctx->ins, - "blob scan add: %s, inode %" PRIu64, - glob_context.gl_pathv[index], - (uint64_t) fs_entry_metadata.st_ino); - } - else { - flb_plg_debug(ctx->ins, - "blob scan skip: %s", - glob_context.gl_pathv[index]); - } + result = apply_glob_pattern(ctx->exclude_pattern, + (char *) glob_context.gl_pathv[index]); - match_count++; + if (result == FLB_FALSE) { + result = blob_file_append(ctx, + glob_context.gl_pathv[index], + &fs_entry_metadata); + + if (result == 0) { + flb_plg_debug(ctx->ins, + "blob scan add: %s, inode %" PRIu64, + glob_context.gl_pathv[index], + (uint64_t) fs_entry_metadata.st_ino); + } + else { + flb_plg_debug(ctx->ins, + "blob scan skip: %s", + glob_context.gl_pathv[index]); + } + + match_count++; + } } } @@ -843,6 +983,11 @@ static struct flb_config_map config_map[] = { "Path to scan for blob/binary files" }, + { + FLB_CONFIG_MAP_STR, "exclude_pattern", NULL, + 0, FLB_TRUE, offsetof(struct blob_ctx, exclude_pattern), + }, + #ifdef FLB_HAVE_SQLDB { FLB_CONFIG_MAP_STR, "database_file", NULL, diff --git a/plugins/in_blob/blob.h b/plugins/in_blob/blob.h index adedce21d79..5f18ca23b93 100644 --- a/plugins/in_blob/blob.h +++ b/plugins/in_blob/blob.h @@ -73,6 +73,7 @@ struct blob_ctx { /* config map options */ flb_sds_t path; + flb_sds_t exclude_pattern; flb_sds_t database_file; time_t scan_refresh_interval; diff --git a/plugins/out_azure_blob/azure_blob.c b/plugins/out_azure_blob/azure_blob.c index ec82ee9278c..de4ee41beaf 100644 --- a/plugins/out_azure_blob/azure_blob.c +++ b/plugins/out_azure_blob/azure_blob.c @@ -668,7 +668,7 @@ static int process_blob_chunk(struct flb_azure_blob *ctx, struct flb_event_chunk continue; } - ret = azb_db_file_insert(ctx, source, file_path, file_size); + ret = azb_db_file_insert(ctx, source, ctx->endpoint, file_path, file_size); if (ret == -1) { flb_plg_error(ctx->ins, "cannot insert blob file into database: %s (size=%lu)", file_path, file_size); @@ -708,6 +708,7 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context uint64_t file_delivery_attempts; off_t offset_start; off_t offset_end; + cfl_sds_t file_destination = NULL; cfl_sds_t file_path = NULL; cfl_sds_t part_ids = NULL; cfl_sds_t source = NULL; @@ -876,7 +877,8 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context &offset_start, &offset_end, &part_delivery_attempts, &file_delivery_attempts, - &file_path); + &file_path, + &file_destination); if (ret == -1) { flb_plg_error(ctx->ins, "cannot get next blob file part"); info->active_upload = FLB_FALSE; @@ -891,6 +893,25 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context /* just continue, the row info was retrieved */ } + if (strcmp(file_destination, ctx->endpoint) != 0) { + flb_plg_info(ctx->ins, + "endpoint change detected, restarting file : %s", + file_path); + + info->active_upload = FLB_FALSE; + + /* we need to set the aborted state flag to wait for existing uploads + * to finish and then wipe the slate and start again but we don't want + * to increment the failure count in this case. + */ + azb_db_file_set_aborted_state(ctx, file_id, file_path, 1); + + cfl_sds_destroy(file_path); + cfl_sds_destroy(file_destination); + + flb_sched_timer_cb_coro_return(); + } + /* since this is the first part we want to increment the files * delivery attempt counter. */ @@ -902,20 +923,31 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context ret = flb_utils_read_file_offset(file_path, offset_start, offset_end, &out_buf, &out_size); if (ret == -1) { flb_plg_error(ctx->ins, "cannot read file part %s", file_path); - cfl_sds_destroy(file_path); + info->active_upload = FLB_FALSE; + + cfl_sds_destroy(file_path); + cfl_sds_destroy(file_destination); + flb_sched_timer_cb_coro_return(); } azb_db_file_part_delivery_attempts(ctx, file_id, part_id, ++part_delivery_attempts); flb_plg_debug(ctx->ins, "sending part file %s (id=%" PRIu64 " part_id=%" PRIu64 ")", file_path, id, part_id); + ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_BLOBS, AZURE_BLOB_BLOCKBLOB, file_path, part_id, NULL, 0, out_buf, out_size); + if (ret == FLB_OK) { ret = azb_db_file_part_uploaded(ctx, id); + if (ret == -1) { info->active_upload = FLB_FALSE; + + cfl_sds_destroy(file_path); + cfl_sds_destroy(file_destination); + flb_sched_timer_cb_coro_return(); } } @@ -926,14 +958,16 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context part_delivery_attempts >= ctx->part_delivery_attempt_limit) { azb_db_file_set_aborted_state(ctx, file_id, file_path, 1); } - /* FIXME */ } + info->active_upload = FLB_FALSE; if (out_buf) { flb_free(out_buf); } + cfl_sds_destroy(file_path); + cfl_sds_destroy(file_destination); flb_sched_timer_cb_coro_return(); } diff --git a/plugins/out_azure_blob/azure_blob.h b/plugins/out_azure_blob/azure_blob.h index 43cb5035792..361e348de79 100644 --- a/plugins/out_azure_blob/azure_blob.h +++ b/plugins/out_azure_blob/azure_blob.h @@ -108,6 +108,7 @@ struct flb_azure_blob { sqlite3_stmt *stmt_delete_file; sqlite3_stmt *stmt_abort_file; sqlite3_stmt *stmt_get_file; + sqlite3_stmt *stmt_update_file_destination; sqlite3_stmt *stmt_update_file_delivery_attempt_count; sqlite3_stmt *stmt_set_file_aborted_state; sqlite3_stmt *stmt_get_next_aborted_file; diff --git a/plugins/out_azure_blob/azure_blob_db.c b/plugins/out_azure_blob/azure_blob_db.c index 3de48b15dea..9ba5e0948d3 100644 --- a/plugins/out_azure_blob/azure_blob_db.c +++ b/plugins/out_azure_blob/azure_blob_db.c @@ -56,6 +56,17 @@ static int prepare_stmts(struct flb_sqldb *db, struct flb_azure_blob *ctx) return -1; } + /* file destination update */ + ret = sqlite3_prepare_v2(db->handler, + SQL_UPDATE_FILE_DESTINATION, -1, + &ctx->stmt_update_file_destination, + NULL); + if (ret != SQLITE_OK) { + flb_plg_error(ctx->ins, "cannot prepare SQL statement: %s", + SQL_UPDATE_FILE_DESTINATION); + return -1; + } + /* delivery attempt counter update */ ret = sqlite3_prepare_v2(db->handler, SQL_UPDATE_FILE_DELIVERY_ATTEMPT_COUNT, -1, @@ -245,6 +256,7 @@ int azb_db_close(struct flb_azure_blob *ctx) sqlite3_finalize(ctx->stmt_delete_file); sqlite3_finalize(ctx->stmt_set_file_aborted_state); sqlite3_finalize(ctx->stmt_get_file); + sqlite3_finalize(ctx->stmt_update_file_destination); sqlite3_finalize(ctx->stmt_update_file_delivery_attempt_count); sqlite3_finalize(ctx->stmt_get_next_aborted_file); sqlite3_finalize(ctx->stmt_get_next_stale_file); @@ -295,7 +307,10 @@ int azb_db_file_exists(struct flb_azure_blob *ctx, char *path, uint64_t *id) } int64_t azb_db_file_insert(struct flb_azure_blob *ctx, - char *source, char *path, size_t size) + char *source, + char *destination, + char *path, + size_t size) { int ret; int64_t id; @@ -308,9 +323,10 @@ int64_t azb_db_file_insert(struct flb_azure_blob *ctx, /* Bind parameters */ sqlite3_bind_text(ctx->stmt_insert_file, 1, source, -1, 0); - sqlite3_bind_text(ctx->stmt_insert_file, 2, path, -1, 0); - sqlite3_bind_int64(ctx->stmt_insert_file, 3, size); - sqlite3_bind_int64(ctx->stmt_insert_file, 4, created); + sqlite3_bind_text(ctx->stmt_insert_file, 2, destination, -1, 0); + sqlite3_bind_text(ctx->stmt_insert_file, 3, path, -1, 0); + sqlite3_bind_int64(ctx->stmt_insert_file, 4, size); + sqlite3_bind_int64(ctx->stmt_insert_file, 5, created); /* Run the insert */ ret = sqlite3_step(ctx->stmt_insert_file); @@ -404,6 +420,35 @@ int azb_db_file_set_aborted_state(struct flb_azure_blob *ctx, return 0; } +int azb_db_file_change_destination(struct flb_azure_blob *ctx, uint64_t id, cfl_sds_t destination) +{ + int ret; + + azb_db_lock(ctx); + + /* Bind parameters */ + sqlite3_bind_text(ctx->stmt_update_file_destination, 1, destination, -1, 0); + sqlite3_bind_int64(ctx->stmt_update_file_destination, 2, id); + + /* Run the update */ + ret = sqlite3_step(ctx->stmt_update_file_destination); + + sqlite3_clear_bindings(ctx->stmt_update_file_destination); + sqlite3_reset(ctx->stmt_update_file_destination); + + azb_db_unlock(ctx); + + if (ret != SQLITE_DONE) { + flb_plg_error(ctx->ins, + "cannot update file destination " + "count for file id=%" PRIu64, id); + + return -1; + } + + return 0; +} + int azb_db_file_delivery_attempts(struct flb_azure_blob *ctx, uint64_t id, uint64_t attempts) { @@ -680,11 +725,14 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx, off_t *offset_start, off_t *offset_end, uint64_t *part_delivery_attempts, uint64_t *file_delivery_attempts, - cfl_sds_t *file_path) + cfl_sds_t *file_path, + cfl_sds_t *destination) { int ret; char *tmp = NULL; + char *tmp_destination = NULL; cfl_sds_t path; + cfl_sds_t local_destination; if (azb_db_lock(ctx) != 0) { return -1; @@ -703,6 +751,7 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx, *part_delivery_attempts = sqlite3_column_int64(ctx->stmt_get_next_file_part, 5); tmp = (char *) sqlite3_column_text(ctx->stmt_get_next_file_part, 6); *file_delivery_attempts = sqlite3_column_int64(ctx->stmt_get_next_file_part, 7); + tmp_destination = (char *) sqlite3_column_text(ctx->stmt_get_next_file_part, 8); } else if (ret == SQLITE_DONE) { /* no records */ @@ -719,11 +768,20 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx, } path = cfl_sds_create(tmp); + local_destination = cfl_sds_create(tmp_destination); sqlite3_clear_bindings(ctx->stmt_get_next_file_part); sqlite3_reset(ctx->stmt_get_next_file_part); - if (!path) { + if (path == NULL || local_destination == NULL) { + if (path != NULL) { + cfl_sds_destroy(path); + } + + if (local_destination != NULL) { + cfl_sds_destroy(local_destination); + } + azb_db_unlock(ctx); return -1; } @@ -732,11 +790,14 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx, ret = azb_db_file_part_in_progress(ctx, 1, *id); if (ret == -1) { cfl_sds_destroy(path); + cfl_sds_destroy(local_destination); azb_db_unlock(ctx); return -1; } *file_path = path; + *destination = local_destination; + azb_db_unlock(ctx); return 1; diff --git a/plugins/out_azure_blob/azure_blob_db.h b/plugins/out_azure_blob/azure_blob_db.h index 475d5ba9301..bbdbc66f1a3 100644 --- a/plugins/out_azure_blob/azure_blob_db.h +++ b/plugins/out_azure_blob/azure_blob_db.h @@ -29,6 +29,7 @@ "CREATE TABLE IF NOT EXISTS out_azure_blob_files (" \ " id INTEGER PRIMARY KEY," \ " source TEXT NOT NULL," \ + " destination TEXT NOT NULL," \ " path TEXT NOT NULL," \ " size INTEGER," \ " created INTEGER," \ @@ -52,8 +53,8 @@ ");" #define SQL_INSERT_FILE \ - "INSERT INTO out_azure_blob_files (source, path, size, created)" \ - " VALUES (@source, @path, @size, @created);" + "INSERT INTO out_azure_blob_files (source, destination, path, size, created)" \ + " VALUES (@source, @destination, @path, @size, @created);" /* DELETE a registered file and all it parts */ #define SQL_DELETE_FILE \ @@ -62,6 +63,9 @@ #define SQL_SET_FILE_ABORTED_STATE \ "UPDATE out_azure_blob_files SET aborted=@state WHERE id=@id;" +#define SQL_UPDATE_FILE_DESTINATION \ + "UPDATE out_azure_blob_files SET destination=@destination WHERE id=@id;" + #define SQL_UPDATE_FILE_DELIVERY_ATTEMPT_COUNT \ "UPDATE out_azure_blob_files " \ " SET delivery_attempts=@delivery_attempts, " \ @@ -121,25 +125,31 @@ " WHERE file_id=@id;" /* Find the oldest files and retrieve the oldest part ready to be uploaded */ -#define SQL_GET_NEXT_FILE_PART \ - "SELECT p.id, " \ - " p.file_id, " \ - " p.part_id, " \ - " p.offset_start, " \ - " p.offset_end, " \ - " p.delivery_attempts, " \ - " f.path, " \ - " f.delivery_attempts, " \ - " f.last_delivery_attempt " \ - "FROM out_azure_blob_parts p " \ - " JOIN out_azure_blob_files f " \ - " ON p.file_id = f.id " \ - "WHERE p.uploaded = 0 " \ - " AND p.in_progress = 0 " \ - " AND f.aborted = 0 " \ - "ORDER BY f.created ASC, " \ - " p.part_id ASC " \ - "LIMIT 1;" +#define SQL_GET_NEXT_FILE_PART \ + " SELECT p.id, " \ + " p.file_id, " \ + " p.part_id, " \ + " p.offset_start, " \ + " p.offset_end, " \ + " p.delivery_attempts, " \ + " f.path, " \ + " f.delivery_attempts, " \ + " f.last_delivery_attempt, " \ + " f.destination " \ + " FROM out_azure_blob_parts p " \ + " JOIN out_azure_blob_files f " \ + " ON p.file_id = f.id " \ + " WHERE p.uploaded = 0 " \ + " AND p.in_progress = 0 " \ + " AND f.aborted = 0 " \ + " AND (p.part_id = 0 OR " \ + " (SELECT sp.uploaded " \ + " FROM out_azure_blob_parts sp " \ + " WHERE sp.part_id = 0 " \ + " AND sp.file_id = p.file_id) = 1) " \ + "ORDER BY f.created ASC, " \ + " p.part_id ASC " \ + " LIMIT 1;" /* @@ -168,7 +178,10 @@ int azb_db_close(struct flb_azure_blob *ctx); int azb_db_file_exists(struct flb_azure_blob *ctx, char *path, uint64_t *id); int64_t azb_db_file_insert(struct flb_azure_blob *ctx, - char *source, char *path, size_t size); + char *source, + char *destination, + char *path, + size_t size); int azb_db_file_delete(struct flb_azure_blob *ctx, uint64_t id, char *path); @@ -176,6 +189,8 @@ int azb_db_file_set_aborted_state(struct flb_azure_blob *ctx, uint64_t id, char *path, uint64_t state); +int azb_db_file_change_destination(struct flb_azure_blob *ctx, uint64_t id, cfl_sds_t destination); + int azb_db_file_delivery_attempts(struct flb_azure_blob *ctx, uint64_t id, uint64_t attempts); int azb_db_file_get_next_aborted(struct flb_azure_blob *ctx, @@ -201,7 +216,8 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx, off_t *offset_start, off_t *offset_end, uint64_t *part_delivery_attempts, uint64_t *file_delivery_attempts, - cfl_sds_t *file_path); + cfl_sds_t *file_path, + cfl_sds_t *destination); int azb_db_file_part_uploaded(struct flb_azure_blob *ctx, uint64_t id); int azb_db_file_part_delivery_attempts(struct flb_azure_blob *ctx, uint64_t file_id, diff --git a/plugins/out_azure_blob/azure_blob_http.c b/plugins/out_azure_blob/azure_blob_http.c index af891c165c0..c5d26554adb 100644 --- a/plugins/out_azure_blob/azure_blob_http.c +++ b/plugins/out_azure_blob/azure_blob_http.c @@ -180,7 +180,7 @@ flb_sds_t azb_http_canonical_request(struct flb_azure_blob *ctx, tmp = flb_sds_cat(can_req, "PUT\n", 4); break; case FLB_HTTP_DELETE: - tmp = flb_sds_cat(can_req, "DELETE\n", 4); + tmp = flb_sds_cat(can_req, "DELETE\n", 7); break; };