Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: identify failed vacuum db by id #16553

Merged
merged 2 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2723,25 +2723,17 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
)
.await;

let (seq_db_id, db_meta) = match res {
let (seq_db_id, _db_meta) = match res {
Ok(x) => x,
Err(e) => {
return Err(e);
}
};

let db_info = Arc::new(DatabaseInfo {
database_id: seq_db_id.data,
name_ident: tenant_dbname.clone(),
meta: db_meta,
});
let table_nivs = get_history_tables_for_gc(
self,
drop_time_range.clone(),
db_info.database_id.db_id,
the_limit,
)
.await?;
let database_id = seq_db_id.data;
let table_nivs =
get_history_tables_for_gc(self, drop_time_range.clone(), database_id.db_id, the_limit)
.await?;

let mut drop_ids = vec![];
let mut vacuum_tables = vec![];
Expand Down
15 changes: 5 additions & 10 deletions src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,19 @@ pub async fn do_vacuum_drop_table(
dry_run_limit: Option<usize>,
) -> VacuumDropTablesResult {
let mut list_files = vec![];
let mut failed_dbs = HashSet::new();
let mut failed_tables = HashSet::new();
for (table_info, operator) in tables {
let result =
vacuum_drop_single_table(&table_info, operator, dry_run_limit, &mut list_files).await;
if result.is_err() {
let db_name = table_info.database_name()?;
let table_id = table_info.ident.table_id;
failed_dbs.insert(db_name.to_string());
failed_tables.insert(table_id);
}
}
Ok(if dry_run_limit.is_some() {
(Some(list_files), failed_dbs, failed_tables)
(Some(list_files), failed_tables)
} else {
(None, failed_dbs, failed_tables)
(None, failed_tables)
})
}

Expand Down Expand Up @@ -182,16 +179,14 @@ pub async fn vacuum_drop_tables_by_table_info(
ret_files.extend(files);
}
}
(Some(ret_files), HashSet::new(), HashSet::new())
(Some(ret_files), HashSet::new())
} else {
let mut failed_dbs = HashSet::new();
let mut failed_tables = HashSet::new();
for res in result {
let (_, db, tbl) = res?;
failed_dbs.extend(db);
let (_, tbl) = res?;
failed_tables.extend(tbl);
}
(None, failed_dbs, failed_tables)
(None, failed_tables)
}
};

Expand Down
4 changes: 0 additions & 4 deletions src/query/ee/tests/it/storages/fuse/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ async fn test_fuse_do_vacuum_drop_table_deletion_error() -> Result<()> {
let tables = vec![(table_info, operator)];
let result = do_vacuum_drop_table(tables, None).await?;
assert!(!result.1.is_empty());
assert!(!result.2.is_empty());
// verify that accessor.delete() was called
assert!(faulty_accessor.hit_delete_operation());

Expand Down Expand Up @@ -341,7 +340,6 @@ async fn test_fuse_vacuum_drop_tables_in_parallel_with_deletion_error() -> Resul

// verify that errors of deletions are not swallowed
assert!(!result.1.is_empty());
assert!(!result.2.is_empty());
}

// Case 2: parallel vacuum dropped tables
Expand All @@ -358,7 +356,6 @@ async fn test_fuse_vacuum_drop_tables_in_parallel_with_deletion_error() -> Resul
assert!(faulty_accessor.hit_delete_operation());
// verify that errors of deletions are not swallowed
assert!(!result.1.is_empty());
assert!(!result.2.is_empty());
}

Ok(())
Expand Down Expand Up @@ -433,7 +430,6 @@ async fn test_fuse_do_vacuum_drop_table_external_storage() -> Result<()> {
let tables = vec![(table_info, operator)];
let result = do_vacuum_drop_table(tables, None).await?;
assert!(!result.1.is_empty());
assert!(!result.2.is_empty());

// verify that accessor.delete() was called
assert!(!accessor.hit_delete_operation());
Expand Down
8 changes: 2 additions & 6 deletions src/query/ee_features/vacuum_handler/src/vacuum_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,8 @@ use databend_common_storages_fuse::FuseTable;
// (TableName, file, file size)
pub type VacuumDropFileInfo = (String, String, u64);

// (drop_files, failed_dbs, failed_tables)
pub type VacuumDropTablesResult = Result<(
Option<Vec<VacuumDropFileInfo>>,
HashSet<String>,
HashSet<u64>,
)>;
// (drop_files, failed_tables)
pub type VacuumDropTablesResult = Result<(Option<Vec<VacuumDropFileInfo>>, HashSet<u64>)>;

#[async_trait::async_trait]
pub trait VacuumHandler: Sync + Send {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
// limitations under the License.

use std::cmp::min;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

use chrono::Duration;
Expand Down Expand Up @@ -139,6 +141,14 @@ impl Interpreter for VacuumDropTablesInterpreter {
))
.await?;

// map: table id to its belonging db id
let mut containing_db = BTreeMap::new();
for drop_id in drop_ids.iter() {
if let DroppedId::Table { name, id } = drop_id {
containing_db.insert(id.table_id, name.db_id);
}
}

info!(
"vacuum drop table from db {:?}, get_drop_table_infos return tables: {:?}, drop_ids: {:?}",
self.plan.database,
Expand All @@ -156,7 +166,7 @@ impl Interpreter for VacuumDropTablesInterpreter {

let handler = get_vacuum_handler();
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
let (files_opt, failed_dbs, failed_tables) = handler
let (files_opt, failed_tables) = handler
.do_vacuum_drop_tables(
threads_nums,
tables,
Expand All @@ -167,13 +177,20 @@ impl Interpreter for VacuumDropTablesInterpreter {
},
)
.await?;
// gc meta data only when not dry run

let failed_db_ids = failed_tables
.iter()
// Safe unwrap: the map is built from drop_ids
.map(|id| *containing_db.get(id).unwrap())
.collect::<HashSet<_>>();

// gc metadata only when not dry run
if self.plan.option.dry_run.is_none() {
let mut success_dropped_ids = vec![];
for drop_id in drop_ids {
match &drop_id {
DroppedId::Db { db_id: _, db_name } => {
if !failed_dbs.contains(db_name) {
DroppedId::Db { db_id, db_name: _ } => {
if !failed_db_ids.contains(db_id) {
success_dropped_ids.push(drop_id);
}
}
Expand All @@ -186,7 +203,7 @@ impl Interpreter for VacuumDropTablesInterpreter {
}
info!(
"failed dbs:{:?}, failed_tables:{:?}, success_drop_ids:{:?}",
failed_dbs, failed_tables, success_dropped_ids
failed_db_ids, failed_tables, success_dropped_ids
);
self.gc_drop_tables(catalog, success_dropped_ids).await?;
}
Expand Down
Loading