Skip to content

Commit

Permalink
[nexus] Make most transactions automatically retry (#4487)
Browse files Browse the repository at this point in the history
Integrates automatic transaction retry into Nexus for most transactions.

Additionally, this PR provides a "RetryHelper" object to help
standardize how transaction retry is performed.
Currently, after a short randomized wait (up to an upper bound), we
retry unconditionally, emitting each
attempt to Oximeter for further analysis.

- [x] Depends on
oxidecomputer/async-bb8-diesel#58
- [x] As noted in
oxidecomputer/async-bb8-diesel#58, this will
require customizing CRDB session variables to work correctly. (Edit:
this is done on each transaction)


Part of oxidecomputer/customer-support#46 
Part of #3814
  • Loading branch information
smklein authored Dec 6, 2023
1 parent d525dee commit b3d641a
Show file tree
Hide file tree
Showing 40 changed files with 3,003 additions and 2,446 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ api_identity = { path = "api_identity" }
approx = "0.5.1"
assert_matches = "1.5.0"
assert_cmd = "2.0.12"
async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "1446f7e0c1f05f33a0581abd51fa873c7652ab61" }
async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "ed7ab5ef0513ba303d33efd41d3e9e381169d59b" }
async-trait = "0.1.74"
atomicwrites = "0.4.2"
authz-macros = { path = "nexus/authz-macros" }
Expand Down
2 changes: 1 addition & 1 deletion nexus/db-model/src/sled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl SledUpdate {
}

/// A set of constraints that can be placed on operations that select a sled.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct SledReservationConstraints {
must_select_from: Vec<Uuid>,
}
Expand Down
1 change: 1 addition & 0 deletions nexus/db-queries/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ oso.workspace = true
paste.workspace = true
# See omicron-rpaths for more about the "pq-sys" dependency.
pq-sys = "*"
rand.workspace = true
ref-cast.workspace = true
samael.workspace = true
serde.workspace = true
Expand Down
27 changes: 7 additions & 20 deletions nexus/db-queries/src/db/collection_attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,12 +563,9 @@ where
#[cfg(test)]
mod test {
use super::*;
use crate::db::{
self, error::TransactionError, identity::Resource as IdentityResource,
};
use crate::db::{self, identity::Resource as IdentityResource};
use async_bb8_diesel::{
AsyncConnection, AsyncRunQueryDsl, AsyncSimpleConnection,
ConnectionManager,
AsyncRunQueryDsl, AsyncSimpleConnection, ConnectionManager,
};
use chrono::Utc;
use db_macros::Resource;
Expand Down Expand Up @@ -999,22 +996,12 @@ mod test {
.set(resource::dsl::collection_id.eq(collection_id)),
);

type TxnError =
TransactionError<AttachError<Resource, Collection, DieselError>>;
let result = conn
.transaction_async(|conn| async move {
attach_query.attach_and_get_result_async(&conn).await.map_err(
|e| match e {
AttachError::DatabaseError(e) => TxnError::from(e),
e => TxnError::CustomError(e),
},
)
})
.await;

// "attach_and_get_result" should return the "attached" resource.
let (returned_collection, returned_resource) =
result.expect("Attach should have worked");
let (returned_collection, returned_resource) = attach_query
.attach_and_get_result_async(&conn)
.await
.expect("Attach should have worked");

assert_eq!(
returned_resource.collection_id.expect("Expected a collection ID"),
collection_id
Expand Down
26 changes: 7 additions & 19 deletions nexus/db-queries/src/db/collection_detach_many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,12 +479,9 @@ where
mod test {
use super::*;
use crate::db::collection_attach::DatastoreAttachTarget;
use crate::db::{
self, error::TransactionError, identity::Resource as IdentityResource,
};
use crate::db::{self, identity::Resource as IdentityResource};
use async_bb8_diesel::{
AsyncConnection, AsyncRunQueryDsl, AsyncSimpleConnection,
ConnectionManager,
AsyncRunQueryDsl, AsyncSimpleConnection, ConnectionManager,
};
use chrono::Utc;
use db_macros::Resource;
Expand Down Expand Up @@ -919,21 +916,12 @@ mod test {
.set(resource::dsl::collection_id.eq(Option::<Uuid>::None)),
);

type TxnError =
TransactionError<DetachManyError<Collection, DieselError>>;
let result = conn
.transaction_async(|conn| async move {
detach_query.detach_and_get_result_async(&conn).await.map_err(
|e| match e {
DetachManyError::DatabaseError(e) => TxnError::from(e),
e => TxnError::CustomError(e),
},
)
})
.await;

// "detach_and_get_result" should return the "detached" resource.
let returned_collection = result.expect("Detach should have worked");
let returned_collection = detach_query
.detach_and_get_result_async(&conn)
.await
.expect("Detach should have worked");

// The returned values should be the latest value in the DB.
assert_eq!(
returned_collection,
Expand Down
103 changes: 55 additions & 48 deletions nexus/db-queries/src/db/datastore/address_lot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use crate::db::error::TransactionError;
use crate::db::model::Name;
use crate::db::model::{AddressLot, AddressLotBlock, AddressLotReservedBlock};
use crate::db::pagination::paginated;
use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl, Connection};
use crate::transaction_retry::OptionalError;
use async_bb8_diesel::{AsyncRunQueryDsl, Connection};
use chrono::Utc;
use diesel::result::Error as DieselError;
use diesel::{ExpressionMethods, QueryDsl, SelectableHelper};
use diesel_dtrace::DTraceConnection;
use ipnetwork::IpNetwork;
Expand Down Expand Up @@ -45,11 +45,12 @@ impl DataStore {
use db::schema::address_lot::dsl as lot_dsl;
use db::schema::address_lot_block::dsl as block_dsl;

self.pool_connection_authorized(opctx)
.await?
// TODO https://github.com/oxidecomputer/omicron/issues/2811
// Audit external networking database transaction usage
.transaction_async(|conn| async move {
let conn = self.pool_connection_authorized(opctx).await?;

// TODO https://github.com/oxidecomputer/omicron/issues/2811
// Audit external networking database transaction usage
self.transaction_retry_wrapper("address_lot_create")
.transaction(&conn, |conn| async move {
let lot = AddressLot::new(&params.identity, params.kind.into());

let db_lot: AddressLot =
Expand Down Expand Up @@ -81,15 +82,14 @@ impl DataStore {
Ok(AddressLotCreateResult { lot: db_lot, blocks: db_blocks })
})
.await
.map_err(|e| match e {
DieselError::DatabaseError(_, _) => public_error_from_diesel(
.map_err(|e| {
public_error_from_diesel(
e,
ErrorHandler::Conflict(
ResourceType::AddressLot,
&params.identity.name.as_str(),
),
),
_ => public_error_from_diesel(e, ErrorHandler::Server),
)
})
}

Expand All @@ -113,47 +113,54 @@ impl DataStore {
LotInUse,
}

type TxnError = TransactionError<AddressLotDeleteError>;
let err = OptionalError::new();

// TODO https://github.com/oxidecomputer/omicron/issues/2811
// Audit external networking database transaction usage
conn.transaction_async(|conn| async move {
let rsvd: Vec<AddressLotReservedBlock> =
rsvd_block_dsl::address_lot_rsvd_block
.filter(rsvd_block_dsl::address_lot_id.eq(id))
.select(AddressLotReservedBlock::as_select())
.limit(1)
.load_async(&conn)
.await?;

if !rsvd.is_empty() {
Err(TxnError::CustomError(AddressLotDeleteError::LotInUse))?;
}
self.transaction_retry_wrapper("address_lot_delete")
.transaction(&conn, |conn| {
let err = err.clone();
async move {
let rsvd: Vec<AddressLotReservedBlock> =
rsvd_block_dsl::address_lot_rsvd_block
.filter(rsvd_block_dsl::address_lot_id.eq(id))
.select(AddressLotReservedBlock::as_select())
.limit(1)
.load_async(&conn)
.await?;

if !rsvd.is_empty() {
return Err(err.bail(AddressLotDeleteError::LotInUse));
}

let now = Utc::now();
diesel::update(lot_dsl::address_lot)
.filter(lot_dsl::time_deleted.is_null())
.filter(lot_dsl::id.eq(id))
.set(lot_dsl::time_deleted.eq(now))
.execute_async(&conn)
.await?;

let now = Utc::now();
diesel::update(lot_dsl::address_lot)
.filter(lot_dsl::time_deleted.is_null())
.filter(lot_dsl::id.eq(id))
.set(lot_dsl::time_deleted.eq(now))
.execute_async(&conn)
.await?;

diesel::delete(block_dsl::address_lot_block)
.filter(block_dsl::address_lot_id.eq(id))
.execute_async(&conn)
.await?;

Ok(())
})
.await
.map_err(|e| match e {
TxnError::Database(e) => {
public_error_from_diesel(e, ErrorHandler::Server)
}
TxnError::CustomError(AddressLotDeleteError::LotInUse) => {
Error::invalid_request("lot is in use")
}
})
diesel::delete(block_dsl::address_lot_block)
.filter(block_dsl::address_lot_id.eq(id))
.execute_async(&conn)
.await?;

Ok(())
}
})
.await
.map_err(|e| {
if let Some(err) = err.take() {
match err {
AddressLotDeleteError::LotInUse => {
Error::invalid_request("lot is in use")
}
}
} else {
public_error_from_diesel(e, ErrorHandler::Server)
}
})
}

pub async fn address_lot_list(
Expand Down
Loading

0 comments on commit b3d641a

Please sign in to comment.