diff --git a/foundation/gax/Cargo.toml b/foundation/gax/Cargo.toml index e9fc8cc8..d625496d 100644 --- a/foundation/gax/Cargo.toml +++ b/foundation/gax/Cargo.toml @@ -17,4 +17,4 @@ thiserror = "1.0" tower = { version = "0.4", features = ["filter"] } http = "1.1" google-cloud-token = { version = "0.1.2", path = "../token" } -tokio-retry = "0.3" +tokio-retry2 = "0.5.3" diff --git a/foundation/gax/src/retry.rs b/foundation/gax/src/retry.rs index 12697719..dc1b4efe 100644 --- a/foundation/gax/src/retry.rs +++ b/foundation/gax/src/retry.rs @@ -2,9 +2,9 @@ use std::future::Future; use std::iter::Take; use std::time::Duration; -pub use tokio_retry::strategy::ExponentialBackoff; -pub use tokio_retry::Condition; -use tokio_retry::{Action, RetryIf}; +pub use tokio_retry2::strategy::ExponentialBackoff; +use tokio_retry2::{Action, RetryIf}; +pub use tokio_retry2::{Condition, MapErr}; use crate::grpc::{Code, Status}; @@ -21,6 +21,7 @@ impl TryAs for Status { pub trait Retry, T: Condition> { fn strategy(&self) -> Take; fn condition(&self) -> T; + fn notify(error: &E, duration: Duration); } pub struct CodeCondition { @@ -70,6 +71,10 @@ impl Retry for RetrySetting { fn condition(&self) -> CodeCondition { CodeCondition::new(self.codes.clone()) } + + fn notify(_error: &Status, _duration: Duration) { + tracing::trace!("retry fn"); + } } impl Default for RetrySetting { @@ -92,7 +97,7 @@ where RT: Retry + Default, { let retry = retry.unwrap_or_default(); - RetryIf::spawn(retry.strategy(), action, retry.condition()).await + RetryIf::spawn(retry.strategy(), action, retry.condition(), RT::notify).await } /// Repeats retries when the specified error is detected. /// The argument specified by 'v' can be reused for each retry. @@ -117,7 +122,6 @@ where if retry.condition().should_retry(&status) { let duration = strategy.next().ok_or(status)?; tokio::time::sleep(duration).await; - tracing::trace!("retry fn"); } else { return Err(status); } @@ -128,6 +132,7 @@ where mod tests { use std::sync::{Arc, Mutex}; + use tokio_retry2::MapErr; use tonic::{Code, Status}; use crate::retry::{invoke, RetrySetting}; @@ -140,7 +145,7 @@ mod tests { let mut lock = counter.lock().unwrap(); *lock += 1; let result: Result = Err(Status::new(Code::Aborted, "error")); - result + result.map_transient_err() }; let actual = invoke(Some(retry), action).await.unwrap_err(); let expected = Status::new(Code::Aborted, "error"); diff --git a/foundation/longrunning/src/autogen/operations_client.rs b/foundation/longrunning/src/autogen/operations_client.rs index 4626afb4..eb4947bb 100644 --- a/foundation/longrunning/src/autogen/operations_client.rs +++ b/foundation/longrunning/src/autogen/operations_client.rs @@ -5,7 +5,7 @@ use tonic::Response; use google_cloud_gax::conn::{Channel, Error}; use google_cloud_gax::create_request; use google_cloud_gax::grpc::{Code, Status}; -use google_cloud_gax::retry::{invoke, RetrySetting}; +use google_cloud_gax::retry::{invoke, MapErr, RetrySetting}; use google_cloud_googleapis::longrunning::operations_client::OperationsClient as InternalOperationsClient; use google_cloud_googleapis::longrunning::{ CancelOperationRequest, DeleteOperationRequest, GetOperationRequest, Operation, WaitOperationRequest, @@ -44,7 +44,7 @@ impl OperationsClient { let name = &req.name; let action = || async { let request = create_request(format!("name={name}"), req.clone()); - self.inner.clone().get_operation(request).await + self.inner.clone().get_operation(request).await.map_transient_err() }; invoke(Some(setting), action).await } @@ -62,7 +62,7 @@ impl OperationsClient { let name = &req.name; let action = || async { let request = create_request(format!("name={name}"), req.clone()); - self.inner.clone().delete_operation(request).await + self.inner.clone().delete_operation(request).await.map_transient_err() }; invoke(Some(setting), action).await } @@ -86,7 +86,7 @@ impl OperationsClient { let name = &req.name; let action = || async { let request = create_request(format!("name={name}"), req.clone()); - self.inner.clone().cancel_operation(request).await + self.inner.clone().cancel_operation(request).await.map_transient_err() }; invoke(Some(setting), action).await } @@ -108,7 +108,7 @@ impl OperationsClient { let setting = retry.unwrap_or_else(default_retry_setting); let action = || async { let request = create_request("".to_string(), req.clone()); - self.inner.clone().wait_operation(request).await + self.inner.clone().wait_operation(request).await.map_transient_err() }; invoke(Some(setting), action).await } diff --git a/kms/src/grpc/apiv1/kms_client.rs b/kms/src/grpc/apiv1/kms_client.rs index c40ba94f..55b56e48 100644 --- a/kms/src/grpc/apiv1/kms_client.rs +++ b/kms/src/grpc/apiv1/kms_client.rs @@ -5,7 +5,7 @@ use crate::grpc::apiv1::conn_pool::ConnectionManager; use google_cloud_gax::create_request; use google_cloud_gax::grpc::{Code, Status}; -use google_cloud_gax::retry::{invoke, RetrySetting}; +use google_cloud_gax::retry::{invoke, MapErr, RetrySetting}; use crate::grpc::kms::v1::CreateCryptoKeyVersionRequest; use crate::grpc::kms::v1::CreateKeyRingRequest; @@ -62,7 +62,7 @@ impl Client { ) -> Result { let action = || async { let request = create_request(format!("location={}", req.location), req.clone()); - self.cm.conn().generate_random_bytes(request).await + self.cm.conn().generate_random_bytes(request).await.map_transient_err() }; invoke(Some(retry.unwrap_or_else(default_setting)), action) .await @@ -81,7 +81,7 @@ impl Client { ) -> Result { let action = || async { let request = create_request(format!("parent={}", req.parent), req.clone()); - self.cm.conn().create_crypto_key(request).await + self.cm.conn().create_crypto_key(request).await.map_transient_err() }; invoke(Some(retry.unwrap_or_else(default_setting)), action) .await @@ -100,7 +100,11 @@ impl Client { ) -> Result { let action = || async { let request = create_request(format!("parent={}", req.parent), req.clone()); - self.cm.conn().create_crypto_key_version(request).await + self.cm + .conn() + .create_crypto_key_version(request) + .await + .map_transient_err() }; invoke(Some(retry.unwrap_or_else(default_setting)), action) .await @@ -119,7 +123,7 @@ impl Client { ) -> Result { let action = || async { let request = create_request(format!("parent={}", req.parent), req.clone()); - self.cm.conn().create_key_ring(request).await + self.cm.conn().create_key_ring(request).await.map_transient_err() }; invoke(Some(retry.unwrap_or_else(default_setting)), action) .await @@ -138,7 +142,11 @@ impl Client { ) -> Result { let action = || async { let request = create_request(format!("name={}", req.name), req.clone()); - self.cm.conn().destroy_crypto_key_version(request).await + self.cm + .conn() + .destroy_crypto_key_version(request) + .await + .map_transient_err() }; invoke(Some(retry.unwrap_or_else(default_setting)), action) .await @@ -157,7 +165,7 @@ impl Client { ) -> Result { let action = || async { let request = create_request(format!("name={}", req.name), req.clone()); - self.cm.conn().get_crypto_key(request).await + self.cm.conn().get_crypto_key(request).await.map_transient_err() }; invoke(Some(retry.unwrap_or_else(default_setting)), action) .await @@ -176,7 +184,7 @@ impl Client { ) -> Result { let action = || async { let request = create_request(format!("name={}", req.name), req.clone()); - self.cm.conn().get_crypto_key_version(request).await + self.cm.conn().get_crypto_key_version(request).await.map_transient_err() }; invoke(Some(retry.unwrap_or_else(default_setting)), action) .await @@ -191,7 +199,7 @@ impl Client { pub async fn get_key_ring(&self, req: GetKeyRingRequest, retry: Option) -> Result { let action = || async { let request = create_request(format!("name={}", req.name), req.clone()); - self.cm.conn().get_key_ring(request).await + self.cm.conn().get_key_ring(request).await.map_transient_err() }; invoke(Some(retry.unwrap_or_else(default_setting)), action) .await @@ -210,7 +218,11 @@ impl Client { ) -> Result { let action = || async { let request = create_request(format!("parent={}", req.parent), req.clone()); - self.cm.conn().list_crypto_key_versions(request).await + self.cm + .conn() + .list_crypto_key_versions(request) + .await + .map_transient_err() }; invoke(Some(retry.unwrap_or_else(default_setting)), action) .await @@ -229,7 +241,7 @@ impl Client { ) -> Result { let action = || async { let request = create_request(format!("parent={}", req.parent), req.clone()); - self.cm.conn().list_crypto_keys(request).await + self.cm.conn().list_crypto_keys(request).await.map_transient_err() }; invoke(Some(retry.unwrap_or_else(default_setting)), action) .await @@ -248,7 +260,7 @@ impl Client { ) -> Result { let action = || async { let request = create_request(format!("parent={}", req.parent), req.clone()); - self.cm.conn().list_key_rings(request).await + self.cm.conn().list_key_rings(request).await.map_transient_err() }; invoke(Some(retry.unwrap_or_else(default_setting)), action) .await @@ -263,7 +275,7 @@ impl Client { pub async fn encrypt(&self, req: EncryptRequest, retry: Option) -> Result { let action = || async { let request = create_request(format!("name={}", req.name), req.clone()); - self.cm.conn().encrypt(request).await + self.cm.conn().encrypt(request).await.map_transient_err() }; invoke(Some(retry.unwrap_or_else(default_setting)), action) .await @@ -278,7 +290,7 @@ impl Client { pub async fn decrypt(&self, req: DecryptRequest, retry: Option) -> Result { let action = || async { let request = create_request(format!("name={}", req.name), req.clone()); - self.cm.conn().decrypt(request).await + self.cm.conn().decrypt(request).await.map_transient_err() }; invoke(Some(retry.unwrap_or_else(default_setting)), action) .await @@ -297,7 +309,7 @@ impl Client { ) -> Result { let action = || async { let request = create_request(format!("name={}", req.name), req.clone()); - self.cm.conn().asymmetric_sign(request).await + self.cm.conn().asymmetric_sign(request).await.map_transient_err() }; invoke(Some(retry.unwrap_or_else(default_setting)), action) .await @@ -312,7 +324,7 @@ impl Client { pub async fn mac_sign(&self, req: MacSignRequest, retry: Option) -> Result { let action = || async { let request = create_request(format!("name={}", req.name), req.clone()); - self.cm.conn().mac_sign(request).await + self.cm.conn().mac_sign(request).await.map_transient_err() }; invoke(Some(retry.unwrap_or_else(default_setting)), action) .await @@ -331,7 +343,7 @@ impl Client { ) -> Result { let action = || async { let request = create_request(format!("name={}", req.name), req.clone()); - self.cm.conn().mac_verify(request).await + self.cm.conn().mac_verify(request).await.map_transient_err() }; invoke(Some(retry.unwrap_or_else(default_setting)), action) .await @@ -350,7 +362,7 @@ impl Client { ) -> Result { let action = || async { let request = create_request(format!("name={}", req.name), req.clone()); - self.cm.conn().get_public_key(request).await + self.cm.conn().get_public_key(request).await.map_transient_err() }; invoke(Some(retry.unwrap_or_else(default_setting)), action) .await diff --git a/pubsub/src/apiv1/publisher_client.rs b/pubsub/src/apiv1/publisher_client.rs index f7911a24..8837135c 100644 --- a/pubsub/src/apiv1/publisher_client.rs +++ b/pubsub/src/apiv1/publisher_client.rs @@ -4,7 +4,7 @@ use google_cloud_gax::conn::Channel; use google_cloud_gax::create_request; use google_cloud_gax::grpc::Response; use google_cloud_gax::grpc::{Code, Status}; -use google_cloud_gax::retry::{invoke, RetrySetting}; +use google_cloud_gax::retry::{invoke, MapErr, RetrySetting}; use google_cloud_googleapis::pubsub::v1::publisher_client::PublisherClient as InternalPublisherClient; use google_cloud_googleapis::pubsub::v1::{ DeleteTopicRequest, DetachSubscriptionRequest, DetachSubscriptionResponse, GetTopicRequest, @@ -41,7 +41,7 @@ impl PublisherClient { let action = || async { let mut client = self.client(); let request = create_request(format!("name={name}"), req.clone()); - client.create_topic(request).await + client.create_topic(request).await.map_transient_err() }; invoke(retry, action).await } @@ -61,7 +61,7 @@ impl PublisherClient { let action = || async { let mut client = self.client(); let request = create_request(format!("name={name}"), req.clone()); - client.update_topic(request).await + client.update_topic(request).await.map_transient_err() }; invoke(retry, action).await } @@ -92,7 +92,7 @@ impl PublisherClient { let action = || async { let mut client = self.client(); let request = create_request(format!("name={name}"), req.clone()); - client.publish(request).await + client.publish(request).await.map_transient_err() }; invoke(Some(setting), action).await } @@ -108,7 +108,7 @@ impl PublisherClient { let action = || async { let mut client = self.client(); let request = create_request(format!("topic={topic}"), req.clone()); - client.get_topic(request).await + client.get_topic(request).await.map_transient_err() }; invoke(retry, action).await } @@ -127,7 +127,11 @@ impl PublisherClient { let action = || async { let mut client = self.client(); let request = create_request(format!("project={project}"), req.clone()); - client.list_topics(request).await.map(|d| d.into_inner()) + client + .list_topics(request) + .await + .map(|d| d.into_inner()) + .map_transient_err() }; let response = invoke(retry.clone(), action).await?; all.extend(response.topics.into_iter()); @@ -152,7 +156,11 @@ impl PublisherClient { let action = || async { let mut client = self.client(); let request = create_request(format!("topic={topic}"), req.clone()); - client.list_topic_subscriptions(request).await.map(|d| d.into_inner()) + client + .list_topic_subscriptions(request) + .await + .map(|d| d.into_inner()) + .map_transient_err() }; let response = invoke(retry.clone(), action).await?; all.extend(response.subscriptions.into_iter()); @@ -181,7 +189,11 @@ impl PublisherClient { let action = || async { let mut client = self.client(); let request = create_request(format!("topic={topic}"), req.clone()); - client.list_topic_snapshots(request).await.map(|d| d.into_inner()) + client + .list_topic_snapshots(request) + .await + .map(|d| d.into_inner()) + .map_transient_err() }; let response = invoke(retry.clone(), action).await?; all.extend(response.snapshots.into_iter()); @@ -207,7 +219,7 @@ impl PublisherClient { let action = || async { let mut client = self.client(); let request = create_request(format!("topic={topic}"), req.clone()); - client.delete_topic(request).await + client.delete_topic(request).await.map_transient_err() }; invoke(retry, action).await } @@ -226,7 +238,7 @@ impl PublisherClient { let action = || async { let mut client = self.client(); let request = create_request(format!("subscription={subscription}"), req.clone()); - client.detach_subscription(request).await + client.detach_subscription(request).await.map_transient_err() }; invoke(retry, action).await } diff --git a/pubsub/src/apiv1/schema_client.rs b/pubsub/src/apiv1/schema_client.rs index 5c2a7d90..4b755475 100644 --- a/pubsub/src/apiv1/schema_client.rs +++ b/pubsub/src/apiv1/schema_client.rs @@ -4,7 +4,7 @@ use google_cloud_gax::conn::Channel; use google_cloud_gax::create_request; use google_cloud_gax::grpc::Response; use google_cloud_gax::grpc::Status; -use google_cloud_gax::retry::{invoke, RetrySetting}; +use google_cloud_gax::retry::{invoke, MapErr, RetrySetting}; use google_cloud_googleapis::pubsub::v1::schema_service_client::SchemaServiceClient; use google_cloud_googleapis::pubsub::v1::{ CreateSchemaRequest, DeleteSchemaRequest, GetSchemaRequest, ListSchemasRequest, Schema, ValidateMessageRequest, @@ -39,7 +39,7 @@ impl SchemaClient { let action = || async { let mut client = self.client(); let request = create_request(format!("parent={parent}"), req.clone()); - client.create_schema(request).await + client.create_schema(request).await.map_transient_err() }; invoke(retry, action).await } @@ -54,7 +54,7 @@ impl SchemaClient { let action = || async { let mut client = self.client(); let request = create_request(format!("name={name}"), req.clone()); - client.get_schema(request).await + client.get_schema(request).await.map_transient_err() }; invoke(retry, action).await } @@ -72,7 +72,11 @@ impl SchemaClient { let action = || async { let mut client = self.client(); let request = create_request(format!("project={project}"), req.clone()); - client.list_schemas(request).await.map(|d| d.into_inner()) + client + .list_schemas(request) + .await + .map(|d| d.into_inner()) + .map_transient_err() }; let response = invoke(retry.clone(), action).await?; all.extend(response.schemas.into_iter()); @@ -93,7 +97,7 @@ impl SchemaClient { let action = || async { let mut client = self.client(); let request = create_request(format!("name={name}"), req.clone()); - client.delete_schema(request).await + client.delete_schema(request).await.map_transient_err() }; invoke(retry, action).await } @@ -108,7 +112,7 @@ impl SchemaClient { let action = || async { let mut client = self.client(); let request = create_request(format!("parent={parent}"), req.clone()); - client.validate_schema(request).await + client.validate_schema(request).await.map_transient_err() }; invoke(retry, action).await } @@ -123,7 +127,7 @@ impl SchemaClient { let action = || async { let mut client = self.client(); let request = create_request(format!("parent={parent}"), req.clone()); - client.validate_message(request).await + client.validate_message(request).await.map_transient_err() }; invoke(retry, action).await } diff --git a/pubsub/src/apiv1/subscriber_client.rs b/pubsub/src/apiv1/subscriber_client.rs index 9b70d496..b969455d 100644 --- a/pubsub/src/apiv1/subscriber_client.rs +++ b/pubsub/src/apiv1/subscriber_client.rs @@ -4,7 +4,7 @@ use google_cloud_gax::conn::Channel; use google_cloud_gax::create_request; use google_cloud_gax::grpc::Status; use google_cloud_gax::grpc::{IntoStreamingRequest, Response, Streaming}; -use google_cloud_gax::retry::{invoke, RetrySetting}; +use google_cloud_gax::retry::{invoke, MapErr, RetrySetting}; use google_cloud_googleapis::pubsub::v1::subscriber_client::SubscriberClient as InternalSubscriberClient; use google_cloud_googleapis::pubsub::v1::{ AcknowledgeRequest, CreateSnapshotRequest, DeleteSnapshotRequest, DeleteSubscriptionRequest, GetSnapshotRequest, @@ -85,7 +85,7 @@ impl SubscriberClient { let action = || async { let mut client = self.client(); let request = create_request(format!("name={name}"), req.clone()); - client.create_subscription(request).await + client.create_subscription(request).await.map_transient_err() }; invoke(retry, action).await } @@ -105,7 +105,7 @@ impl SubscriberClient { let action = || async { let mut client = self.client(); let request = create_request(format!("subscription.name={name}"), req.clone()); - client.update_subscription(request).await + client.update_subscription(request).await.map_transient_err() }; invoke(retry, action).await } @@ -121,7 +121,7 @@ impl SubscriberClient { let action = || async { let mut client = self.client(); let request = create_request(format!("subscription={subscription}"), req.clone()); - client.get_subscription(request).await + client.get_subscription(request).await.map_transient_err() }; invoke(retry, action).await } @@ -140,7 +140,11 @@ impl SubscriberClient { let action = || async { let mut client = self.client(); let request = create_request(format!("project={project}"), req.clone()); - client.list_subscriptions(request).await.map(|d| d.into_inner()) + client + .list_subscriptions(request) + .await + .map(|d| d.into_inner()) + .map_transient_err() }; let response: ListSubscriptionsResponse = invoke(retry.clone(), action).await?; all.extend(response.subscriptions.into_iter()); @@ -166,7 +170,7 @@ impl SubscriberClient { let action = || async { let mut client = self.client(); let request = create_request(format!("subscription={subscription}"), req.clone()); - client.delete_subscription(request).await + client.delete_subscription(request).await.map_transient_err() }; invoke(retry, action).await } @@ -186,7 +190,7 @@ impl SubscriberClient { let action = || async { let mut client = self.client(); let request = create_request(format!("subscription={subscription}"), req.clone()); - client.modify_ack_deadline(request).await + client.modify_ack_deadline(request).await.map_transient_err() }; invoke(retry, action).await } @@ -208,7 +212,7 @@ impl SubscriberClient { let action = || async { let mut client = self.client(); let request = create_request(format!("subscription={subscription}"), req.clone()); - client.acknowledge(request).await + client.acknowledge(request).await.map_transient_err() }; invoke(retry, action).await } @@ -222,7 +226,7 @@ impl SubscriberClient { let action = || async { let mut client = self.client(); let request = create_request(format!("subscription={subscription}"), req.clone()); - client.pull(request).await + client.pull(request).await.map_transient_err() }; invoke(retry, action).await } @@ -260,7 +264,7 @@ impl SubscriberClient { "x-goog-request-params", format!("subscription={}", req.subscription).parse().unwrap(), ); - client.streaming_pull(v).await + client.streaming_pull(v).await.map_transient_err() }; invoke(retry, action).await } @@ -281,7 +285,7 @@ impl SubscriberClient { let action = || async { let mut client = self.client(); let request = create_request(format!("subscription={subscription}"), req.clone()); - client.modify_push_config(request).await + client.modify_push_config(request).await.map_transient_err() }; invoke(retry, action).await } @@ -301,7 +305,7 @@ impl SubscriberClient { let action = || async { let mut client = self.client(); let request = create_request(format!("snapshot={snapshot}"), req.clone()); - client.get_snapshot(request).await + client.get_snapshot(request).await.map_transient_err() }; invoke(retry, action).await } @@ -323,7 +327,11 @@ impl SubscriberClient { let action = || async { let mut client = self.client(); let request = create_request(format!("project={project}"), req.clone()); - client.list_snapshots(request).await.map(|d| d.into_inner()) + client + .list_snapshots(request) + .await + .map(|d| d.into_inner()) + .map_transient_err() }; let response: ListSnapshotsResponse = invoke(retry.clone(), action).await?; all.extend(response.snapshots.into_iter()); @@ -360,7 +368,7 @@ impl SubscriberClient { let action = || async { let mut client = self.client(); let request = create_request(format!("name={name}"), req.clone()); - client.create_snapshot(request).await + client.create_snapshot(request).await.map_transient_err() }; invoke(retry, action).await } @@ -384,7 +392,7 @@ impl SubscriberClient { let action = || async { let mut client = self.client(); let request = create_request(format!("snapshot.name={name}"), req.clone()); - client.update_snapshot(request).await + client.update_snapshot(request).await.map_transient_err() }; invoke(retry, action).await } @@ -408,7 +416,7 @@ impl SubscriberClient { let action = || async { let mut client = self.client(); let request = create_request(format!("snapshot={name}"), req.clone()); - client.delete_snapshot(request).await + client.delete_snapshot(request).await.map_transient_err() }; invoke(retry, action).await } @@ -420,7 +428,7 @@ impl SubscriberClient { let mut client = self.client(); let subscription = req.subscription.clone(); let request = create_request(format!("subscription={subscription}"), req.clone()); - client.seek(request).await + client.seek(request).await.map_transient_err() }; invoke(retry, action).await } diff --git a/spanner/src/admin/database/database_admin_client.rs b/spanner/src/admin/database/database_admin_client.rs index d705f0a8..b98716cf 100644 --- a/spanner/src/admin/database/database_admin_client.rs +++ b/spanner/src/admin/database/database_admin_client.rs @@ -1,7 +1,7 @@ use google_cloud_gax::conn::Channel; use google_cloud_gax::create_request; use google_cloud_gax::grpc::{Response, Status}; -use google_cloud_gax::retry::{invoke, RetrySetting}; +use google_cloud_gax::retry::{invoke, MapErr, RetrySetting}; use google_cloud_googleapis::iam::v1::{ GetIamPolicyRequest, Policy, SetIamPolicyRequest, TestIamPermissionsRequest, TestIamPermissionsResponse, }; @@ -46,7 +46,12 @@ impl DatabaseAdminClient { loop { let action = || async { let request = create_request(format!("parent={parent}"), req.clone()); - self.inner.clone().list_databases(request).await.map(|d| d.into_inner()) + self.inner + .clone() + .list_databases(request) + .await + .map(|d| d.into_inner()) + .map_transient_err() }; let response = invoke(retry.clone(), action).await?; all_databases.extend(response.databases.into_iter()); @@ -72,7 +77,7 @@ impl DatabaseAdminClient { let parent = &req.parent; let action = || async { let request = create_request(format!("parent={parent}"), req.clone()); - self.inner.clone().create_database(request).await + self.inner.clone().create_database(request).await.map_transient_err() }; invoke(retry, action) .await @@ -90,7 +95,7 @@ impl DatabaseAdminClient { let name = &req.name; let action = || async { let request = create_request(format!("name={name}"), req.clone()); - self.inner.clone().get_database(request).await + self.inner.clone().get_database(request).await.map_transient_err() }; invoke(retry, action).await } @@ -113,7 +118,11 @@ impl DatabaseAdminClient { let database = &req.database; let action = || async { let request = create_request(format!("database={database}"), req.clone()); - self.inner.clone().update_database_ddl(request).await + self.inner + .clone() + .update_database_ddl(request) + .await + .map_transient_err() }; invoke(retry, action) .await @@ -133,7 +142,7 @@ impl DatabaseAdminClient { let database = &req.database; let action = || async { let request = create_request(format!("database={database}"), req.clone()); - self.inner.clone().drop_database(request).await + self.inner.clone().drop_database(request).await.map_transient_err() }; invoke(retry, action).await } @@ -151,7 +160,7 @@ impl DatabaseAdminClient { let database = &req.database; let action = || async { let request = create_request(format!("database={database}"), req.clone()); - self.inner.clone().get_database_ddl(request).await + self.inner.clone().get_database_ddl(request).await.map_transient_err() }; invoke(retry, action).await } @@ -173,7 +182,7 @@ impl DatabaseAdminClient { let resource = &req.resource; let action = || async { let request = create_request(format!("resource={resource}"), req.clone()); - self.inner.clone().set_iam_policy(request).await + self.inner.clone().set_iam_policy(request).await.map_transient_err() }; invoke(retry, action).await } @@ -196,7 +205,7 @@ impl DatabaseAdminClient { let resource = &req.resource; let action = || async { let request = create_request(format!("resource={resource}"), req.clone()); - self.inner.clone().get_iam_policy(request).await + self.inner.clone().get_iam_policy(request).await.map_transient_err() }; invoke(retry, action).await } @@ -221,7 +230,11 @@ impl DatabaseAdminClient { let resource = &req.resource; let action = || async { let request = create_request(format!("resource={resource}"), req.clone()); - self.inner.clone().test_iam_permissions(request).await + self.inner + .clone() + .test_iam_permissions(request) + .await + .map_transient_err() }; invoke(retry, action).await } @@ -248,7 +261,7 @@ impl DatabaseAdminClient { let parent = &req.parent; let action = || async { let request = create_request(format!("parent={parent}"), req.clone()); - self.inner.clone().create_backup(request).await + self.inner.clone().create_backup(request).await.map_transient_err() }; invoke(retry, action) .await @@ -266,7 +279,7 @@ impl DatabaseAdminClient { let name = &req.name; let action = || async { let request = create_request(format!("name={name}"), req.clone()); - self.inner.clone().get_backup(request).await + self.inner.clone().get_backup(request).await.map_transient_err() }; invoke(retry, action).await } @@ -282,7 +295,7 @@ impl DatabaseAdminClient { let name = &req.backup.as_ref().unwrap().name; let action = || async { let request = create_request(format!("backup.name={name}"), req.clone()); - self.inner.clone().update_backup(request).await + self.inner.clone().update_backup(request).await.map_transient_err() }; invoke(retry, action).await } @@ -298,7 +311,7 @@ impl DatabaseAdminClient { let name = &req.name; let action = || async { let request = create_request(format!("name={name}"), req.clone()); - self.inner.clone().delete_backup(request).await + self.inner.clone().delete_backup(request).await.map_transient_err() }; invoke(retry, action).await } @@ -319,7 +332,12 @@ impl DatabaseAdminClient { loop { let action = || async { let request = create_request(format!("parent={parent}"), req.clone()); - self.inner.clone().list_backups(request).await.map(|d| d.into_inner()) + self.inner + .clone() + .list_backups(request) + .await + .map(|d| d.into_inner()) + .map_transient_err() }; let response = invoke(retry.clone(), action).await?; all_backups.extend(response.backups.into_iter()); @@ -357,7 +375,7 @@ impl DatabaseAdminClient { let parent = &req.parent; let action = || async { let request = create_request(format!("parent={parent}"), req.clone()); - self.inner.clone().restore_database(request).await + self.inner.clone().restore_database(request).await.map_transient_err() }; invoke(retry, action) .await @@ -392,6 +410,7 @@ impl DatabaseAdminClient { .list_backup_operations(request) .await .map(|d| d.into_inner()) + .map_transient_err() }; let response = invoke(retry.clone(), action).await?; all_operations.extend(response.operations.into_iter()); @@ -428,6 +447,7 @@ impl DatabaseAdminClient { .list_database_operations(request) .await .map(|d| d.into_inner()) + .map_transient_err() }; let response = invoke(retry.clone(), action).await?; all_operations.extend(response.operations.into_iter()); diff --git a/spanner/src/admin/instance/instance_admin_client.rs b/spanner/src/admin/instance/instance_admin_client.rs index 6a2aaaaa..98c161d6 100644 --- a/spanner/src/admin/instance/instance_admin_client.rs +++ b/spanner/src/admin/instance/instance_admin_client.rs @@ -1,7 +1,7 @@ use google_cloud_gax::conn::Channel; use google_cloud_gax::create_request; use google_cloud_gax::grpc::{Response, Status}; -use google_cloud_gax::retry::{invoke, RetrySetting}; +use google_cloud_gax::retry::{invoke, MapErr, RetrySetting}; use google_cloud_googleapis::iam::v1::{ GetIamPolicyRequest, Policy, SetIamPolicyRequest, TestIamPermissionsRequest, TestIamPermissionsResponse, }; @@ -48,6 +48,7 @@ impl InstanceAdminClient { .list_instance_configs(request) .await .map(|d| d.into_inner()) + .map_transient_err() }; let response = invoke(retry.clone(), action).await?; all.extend(response.instance_configs.into_iter()); @@ -74,6 +75,7 @@ impl InstanceAdminClient { .get_instance_config(request) .await .map(|d| d.into_inner()) + .map_transient_err() }; invoke(retry, action).await } @@ -92,7 +94,12 @@ impl InstanceAdminClient { loop { let action = || async { let request = create_request(format!("parent={parent}"), req.clone()); - self.inner.clone().list_instances(request).await.map(|d| d.into_inner()) + self.inner + .clone() + .list_instances(request) + .await + .map(|d| d.into_inner()) + .map_transient_err() }; let response = invoke(retry.clone(), action).await?; all.extend(response.instances.into_iter()); @@ -114,7 +121,7 @@ impl InstanceAdminClient { let name = &req.name; let action = || async { let request = create_request(format!("name={name}"), req.clone()); - self.inner.clone().get_instance(request).await + self.inner.clone().get_instance(request).await.map_transient_err() }; invoke(retry, action).await } @@ -168,7 +175,7 @@ impl InstanceAdminClient { let parent = &req.parent; let action = || async { let request = create_request(format!("parent={parent}"), req.clone()); - self.inner.clone().create_instance(request).await + self.inner.clone().create_instance(request).await.map_transient_err() }; invoke(retry, action) .await @@ -229,7 +236,7 @@ impl InstanceAdminClient { let instance_name = &req.instance.as_ref().unwrap().name; let action = || async { let request = create_request(format!("instance.name={instance_name}"), req.clone()); - self.inner.clone().update_instance(request).await + self.inner.clone().update_instance(request).await.map_transient_err() }; invoke(retry, action) .await @@ -257,7 +264,7 @@ impl InstanceAdminClient { let name = &req.name; let action = || async { let request = create_request(format!("name={name}"), req.clone()); - self.inner.clone().delete_instance(request).await + self.inner.clone().delete_instance(request).await.map_transient_err() }; invoke(retry, action).await } @@ -276,7 +283,7 @@ impl InstanceAdminClient { let retry = Some(retry.unwrap_or_else(default_retry_setting)); let action = || async { let request = create_request(format!("resource={resource}"), req.clone()); - self.inner.clone().set_iam_policy(request).await + self.inner.clone().set_iam_policy(request).await.map_transient_err() }; invoke(retry, action).await } @@ -295,7 +302,7 @@ impl InstanceAdminClient { let retry = Some(retry.unwrap_or_else(default_retry_setting)); let action = || async { let request = create_request(format!("resource={resource}"), req.clone()); - self.inner.clone().get_iam_policy(request).await + self.inner.clone().get_iam_policy(request).await.map_transient_err() }; invoke(retry, action).await } @@ -316,7 +323,11 @@ impl InstanceAdminClient { let retry = Some(retry.unwrap_or_else(default_retry_setting)); let action = || async { let request = create_request(format!("resource={resource}"), req.clone()); - self.inner.clone().test_iam_permissions(request).await + self.inner + .clone() + .test_iam_permissions(request) + .await + .map_transient_err() }; invoke(retry, action).await } diff --git a/spanner/src/retry.rs b/spanner/src/retry.rs index 2155d347..1d6271fc 100644 --- a/spanner/src/retry.rs +++ b/spanner/src/retry.rs @@ -100,6 +100,12 @@ where _marker: PhantomData, } } + + fn notify(error: &E, duration: std::time::Duration) { + if let Some(status) = error.try_as() { + tracing::trace!("transaction retry fn, error: {:?}, duration: {:?}", status, duration); + }; + } } impl TransactionRetrySetting {