diff --git a/plugins/out_azure_blob/azure_blob.c b/plugins/out_azure_blob/azure_blob.c index ec82ee9278c..de4ee41beaf 100644 --- a/plugins/out_azure_blob/azure_blob.c +++ b/plugins/out_azure_blob/azure_blob.c @@ -668,7 +668,7 @@ static int process_blob_chunk(struct flb_azure_blob *ctx, struct flb_event_chunk continue; } - ret = azb_db_file_insert(ctx, source, file_path, file_size); + ret = azb_db_file_insert(ctx, source, ctx->endpoint, file_path, file_size); if (ret == -1) { flb_plg_error(ctx->ins, "cannot insert blob file into database: %s (size=%lu)", file_path, file_size); @@ -708,6 +708,7 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context uint64_t file_delivery_attempts; off_t offset_start; off_t offset_end; + cfl_sds_t file_destination = NULL; cfl_sds_t file_path = NULL; cfl_sds_t part_ids = NULL; cfl_sds_t source = NULL; @@ -876,7 +877,8 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context &offset_start, &offset_end, &part_delivery_attempts, &file_delivery_attempts, - &file_path); + &file_path, + &file_destination); if (ret == -1) { flb_plg_error(ctx->ins, "cannot get next blob file part"); info->active_upload = FLB_FALSE; @@ -891,6 +893,25 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context /* just continue, the row info was retrieved */ } + if (strcmp(file_destination, ctx->endpoint) != 0) { + flb_plg_info(ctx->ins, + "endpoint change detected, restarting file : %s", + file_path); + + info->active_upload = FLB_FALSE; + + /* we need to set the aborted state flag to wait for existing uploads + * to finish and then wipe the slate and start again but we don't want + * to increment the failure count in this case. + */ + azb_db_file_set_aborted_state(ctx, file_id, file_path, 1); + + cfl_sds_destroy(file_path); + cfl_sds_destroy(file_destination); + + flb_sched_timer_cb_coro_return(); + } + /* since this is the first part we want to increment the files * delivery attempt counter. */ @@ -902,20 +923,31 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context ret = flb_utils_read_file_offset(file_path, offset_start, offset_end, &out_buf, &out_size); if (ret == -1) { flb_plg_error(ctx->ins, "cannot read file part %s", file_path); - cfl_sds_destroy(file_path); + info->active_upload = FLB_FALSE; + + cfl_sds_destroy(file_path); + cfl_sds_destroy(file_destination); + flb_sched_timer_cb_coro_return(); } azb_db_file_part_delivery_attempts(ctx, file_id, part_id, ++part_delivery_attempts); flb_plg_debug(ctx->ins, "sending part file %s (id=%" PRIu64 " part_id=%" PRIu64 ")", file_path, id, part_id); + ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_BLOBS, AZURE_BLOB_BLOCKBLOB, file_path, part_id, NULL, 0, out_buf, out_size); + if (ret == FLB_OK) { ret = azb_db_file_part_uploaded(ctx, id); + if (ret == -1) { info->active_upload = FLB_FALSE; + + cfl_sds_destroy(file_path); + cfl_sds_destroy(file_destination); + flb_sched_timer_cb_coro_return(); } } @@ -926,14 +958,16 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context part_delivery_attempts >= ctx->part_delivery_attempt_limit) { azb_db_file_set_aborted_state(ctx, file_id, file_path, 1); } - /* FIXME */ } + info->active_upload = FLB_FALSE; if (out_buf) { flb_free(out_buf); } + cfl_sds_destroy(file_path); + cfl_sds_destroy(file_destination); flb_sched_timer_cb_coro_return(); } diff --git a/plugins/out_azure_blob/azure_blob.h b/plugins/out_azure_blob/azure_blob.h index 43cb5035792..361e348de79 100644 --- a/plugins/out_azure_blob/azure_blob.h +++ b/plugins/out_azure_blob/azure_blob.h @@ -108,6 +108,7 @@ struct flb_azure_blob { sqlite3_stmt *stmt_delete_file; sqlite3_stmt *stmt_abort_file; sqlite3_stmt *stmt_get_file; + sqlite3_stmt *stmt_update_file_destination; sqlite3_stmt *stmt_update_file_delivery_attempt_count; sqlite3_stmt *stmt_set_file_aborted_state; sqlite3_stmt *stmt_get_next_aborted_file; diff --git a/plugins/out_azure_blob/azure_blob_db.c b/plugins/out_azure_blob/azure_blob_db.c index 3de48b15dea..9ba5e0948d3 100644 --- a/plugins/out_azure_blob/azure_blob_db.c +++ b/plugins/out_azure_blob/azure_blob_db.c @@ -56,6 +56,17 @@ static int prepare_stmts(struct flb_sqldb *db, struct flb_azure_blob *ctx) return -1; } + /* file destination update */ + ret = sqlite3_prepare_v2(db->handler, + SQL_UPDATE_FILE_DESTINATION, -1, + &ctx->stmt_update_file_destination, + NULL); + if (ret != SQLITE_OK) { + flb_plg_error(ctx->ins, "cannot prepare SQL statement: %s", + SQL_UPDATE_FILE_DESTINATION); + return -1; + } + /* delivery attempt counter update */ ret = sqlite3_prepare_v2(db->handler, SQL_UPDATE_FILE_DELIVERY_ATTEMPT_COUNT, -1, @@ -245,6 +256,7 @@ int azb_db_close(struct flb_azure_blob *ctx) sqlite3_finalize(ctx->stmt_delete_file); sqlite3_finalize(ctx->stmt_set_file_aborted_state); sqlite3_finalize(ctx->stmt_get_file); + sqlite3_finalize(ctx->stmt_update_file_destination); sqlite3_finalize(ctx->stmt_update_file_delivery_attempt_count); sqlite3_finalize(ctx->stmt_get_next_aborted_file); sqlite3_finalize(ctx->stmt_get_next_stale_file); @@ -295,7 +307,10 @@ int azb_db_file_exists(struct flb_azure_blob *ctx, char *path, uint64_t *id) } int64_t azb_db_file_insert(struct flb_azure_blob *ctx, - char *source, char *path, size_t size) + char *source, + char *destination, + char *path, + size_t size) { int ret; int64_t id; @@ -308,9 +323,10 @@ int64_t azb_db_file_insert(struct flb_azure_blob *ctx, /* Bind parameters */ sqlite3_bind_text(ctx->stmt_insert_file, 1, source, -1, 0); - sqlite3_bind_text(ctx->stmt_insert_file, 2, path, -1, 0); - sqlite3_bind_int64(ctx->stmt_insert_file, 3, size); - sqlite3_bind_int64(ctx->stmt_insert_file, 4, created); + sqlite3_bind_text(ctx->stmt_insert_file, 2, destination, -1, 0); + sqlite3_bind_text(ctx->stmt_insert_file, 3, path, -1, 0); + sqlite3_bind_int64(ctx->stmt_insert_file, 4, size); + sqlite3_bind_int64(ctx->stmt_insert_file, 5, created); /* Run the insert */ ret = sqlite3_step(ctx->stmt_insert_file); @@ -404,6 +420,35 @@ int azb_db_file_set_aborted_state(struct flb_azure_blob *ctx, return 0; } +int azb_db_file_change_destination(struct flb_azure_blob *ctx, uint64_t id, cfl_sds_t destination) +{ + int ret; + + azb_db_lock(ctx); + + /* Bind parameters */ + sqlite3_bind_text(ctx->stmt_update_file_destination, 1, destination, -1, 0); + sqlite3_bind_int64(ctx->stmt_update_file_destination, 2, id); + + /* Run the update */ + ret = sqlite3_step(ctx->stmt_update_file_destination); + + sqlite3_clear_bindings(ctx->stmt_update_file_destination); + sqlite3_reset(ctx->stmt_update_file_destination); + + azb_db_unlock(ctx); + + if (ret != SQLITE_DONE) { + flb_plg_error(ctx->ins, + "cannot update file destination " + "count for file id=%" PRIu64, id); + + return -1; + } + + return 0; +} + int azb_db_file_delivery_attempts(struct flb_azure_blob *ctx, uint64_t id, uint64_t attempts) { @@ -680,11 +725,14 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx, off_t *offset_start, off_t *offset_end, uint64_t *part_delivery_attempts, uint64_t *file_delivery_attempts, - cfl_sds_t *file_path) + cfl_sds_t *file_path, + cfl_sds_t *destination) { int ret; char *tmp = NULL; + char *tmp_destination = NULL; cfl_sds_t path; + cfl_sds_t local_destination; if (azb_db_lock(ctx) != 0) { return -1; @@ -703,6 +751,7 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx, *part_delivery_attempts = sqlite3_column_int64(ctx->stmt_get_next_file_part, 5); tmp = (char *) sqlite3_column_text(ctx->stmt_get_next_file_part, 6); *file_delivery_attempts = sqlite3_column_int64(ctx->stmt_get_next_file_part, 7); + tmp_destination = (char *) sqlite3_column_text(ctx->stmt_get_next_file_part, 8); } else if (ret == SQLITE_DONE) { /* no records */ @@ -719,11 +768,20 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx, } path = cfl_sds_create(tmp); + local_destination = cfl_sds_create(tmp_destination); sqlite3_clear_bindings(ctx->stmt_get_next_file_part); sqlite3_reset(ctx->stmt_get_next_file_part); - if (!path) { + if (path == NULL || local_destination == NULL) { + if (path != NULL) { + cfl_sds_destroy(path); + } + + if (local_destination != NULL) { + cfl_sds_destroy(local_destination); + } + azb_db_unlock(ctx); return -1; } @@ -732,11 +790,14 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx, ret = azb_db_file_part_in_progress(ctx, 1, *id); if (ret == -1) { cfl_sds_destroy(path); + cfl_sds_destroy(local_destination); azb_db_unlock(ctx); return -1; } *file_path = path; + *destination = local_destination; + azb_db_unlock(ctx); return 1; diff --git a/plugins/out_azure_blob/azure_blob_db.h b/plugins/out_azure_blob/azure_blob_db.h index 475d5ba9301..bbdbc66f1a3 100644 --- a/plugins/out_azure_blob/azure_blob_db.h +++ b/plugins/out_azure_blob/azure_blob_db.h @@ -29,6 +29,7 @@ "CREATE TABLE IF NOT EXISTS out_azure_blob_files (" \ " id INTEGER PRIMARY KEY," \ " source TEXT NOT NULL," \ + " destination TEXT NOT NULL," \ " path TEXT NOT NULL," \ " size INTEGER," \ " created INTEGER," \ @@ -52,8 +53,8 @@ ");" #define SQL_INSERT_FILE \ - "INSERT INTO out_azure_blob_files (source, path, size, created)" \ - " VALUES (@source, @path, @size, @created);" + "INSERT INTO out_azure_blob_files (source, destination, path, size, created)" \ + " VALUES (@source, @destination, @path, @size, @created);" /* DELETE a registered file and all it parts */ #define SQL_DELETE_FILE \ @@ -62,6 +63,9 @@ #define SQL_SET_FILE_ABORTED_STATE \ "UPDATE out_azure_blob_files SET aborted=@state WHERE id=@id;" +#define SQL_UPDATE_FILE_DESTINATION \ + "UPDATE out_azure_blob_files SET destination=@destination WHERE id=@id;" + #define SQL_UPDATE_FILE_DELIVERY_ATTEMPT_COUNT \ "UPDATE out_azure_blob_files " \ " SET delivery_attempts=@delivery_attempts, " \ @@ -121,25 +125,31 @@ " WHERE file_id=@id;" /* Find the oldest files and retrieve the oldest part ready to be uploaded */ -#define SQL_GET_NEXT_FILE_PART \ - "SELECT p.id, " \ - " p.file_id, " \ - " p.part_id, " \ - " p.offset_start, " \ - " p.offset_end, " \ - " p.delivery_attempts, " \ - " f.path, " \ - " f.delivery_attempts, " \ - " f.last_delivery_attempt " \ - "FROM out_azure_blob_parts p " \ - " JOIN out_azure_blob_files f " \ - " ON p.file_id = f.id " \ - "WHERE p.uploaded = 0 " \ - " AND p.in_progress = 0 " \ - " AND f.aborted = 0 " \ - "ORDER BY f.created ASC, " \ - " p.part_id ASC " \ - "LIMIT 1;" +#define SQL_GET_NEXT_FILE_PART \ + " SELECT p.id, " \ + " p.file_id, " \ + " p.part_id, " \ + " p.offset_start, " \ + " p.offset_end, " \ + " p.delivery_attempts, " \ + " f.path, " \ + " f.delivery_attempts, " \ + " f.last_delivery_attempt, " \ + " f.destination " \ + " FROM out_azure_blob_parts p " \ + " JOIN out_azure_blob_files f " \ + " ON p.file_id = f.id " \ + " WHERE p.uploaded = 0 " \ + " AND p.in_progress = 0 " \ + " AND f.aborted = 0 " \ + " AND (p.part_id = 0 OR " \ + " (SELECT sp.uploaded " \ + " FROM out_azure_blob_parts sp " \ + " WHERE sp.part_id = 0 " \ + " AND sp.file_id = p.file_id) = 1) " \ + "ORDER BY f.created ASC, " \ + " p.part_id ASC " \ + " LIMIT 1;" /* @@ -168,7 +178,10 @@ int azb_db_close(struct flb_azure_blob *ctx); int azb_db_file_exists(struct flb_azure_blob *ctx, char *path, uint64_t *id); int64_t azb_db_file_insert(struct flb_azure_blob *ctx, - char *source, char *path, size_t size); + char *source, + char *destination, + char *path, + size_t size); int azb_db_file_delete(struct flb_azure_blob *ctx, uint64_t id, char *path); @@ -176,6 +189,8 @@ int azb_db_file_set_aborted_state(struct flb_azure_blob *ctx, uint64_t id, char *path, uint64_t state); +int azb_db_file_change_destination(struct flb_azure_blob *ctx, uint64_t id, cfl_sds_t destination); + int azb_db_file_delivery_attempts(struct flb_azure_blob *ctx, uint64_t id, uint64_t attempts); int azb_db_file_get_next_aborted(struct flb_azure_blob *ctx, @@ -201,7 +216,8 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx, off_t *offset_start, off_t *offset_end, uint64_t *part_delivery_attempts, uint64_t *file_delivery_attempts, - cfl_sds_t *file_path); + cfl_sds_t *file_path, + cfl_sds_t *destination); int azb_db_file_part_uploaded(struct flb_azure_blob *ctx, uint64_t id); int azb_db_file_part_delivery_attempts(struct flb_azure_blob *ctx, uint64_t file_id,