Skip to content

Commit

Permalink
Updates unmaintained tokio-retry to tokio-retry2 (#311)
Browse files Browse the repository at this point in the history
  • Loading branch information
naomijub authored Sep 21, 2024
1 parent 92c703f commit 5d0bb4d
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 89 deletions.
2 changes: 1 addition & 1 deletion foundation/gax/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ thiserror = "1.0"
tower = { version = "0.4", features = ["filter"] }
http = "1.1"
google-cloud-token = { version = "0.1.2", path = "../token" }
tokio-retry = "0.3"
tokio-retry2 = "0.5.3"
17 changes: 11 additions & 6 deletions foundation/gax/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use std::future::Future;
use std::iter::Take;
use std::time::Duration;

pub use tokio_retry::strategy::ExponentialBackoff;
pub use tokio_retry::Condition;
use tokio_retry::{Action, RetryIf};
pub use tokio_retry2::strategy::ExponentialBackoff;
use tokio_retry2::{Action, RetryIf};
pub use tokio_retry2::{Condition, MapErr};

use crate::grpc::{Code, Status};

Expand All @@ -21,6 +21,7 @@ impl TryAs<Status> for Status {
pub trait Retry<E: TryAs<Status>, T: Condition<E>> {
fn strategy(&self) -> Take<ExponentialBackoff>;
fn condition(&self) -> T;
fn notify(error: &E, duration: Duration);
}

pub struct CodeCondition {
Expand Down Expand Up @@ -70,6 +71,10 @@ impl Retry<Status, CodeCondition> for RetrySetting {
fn condition(&self) -> CodeCondition {
CodeCondition::new(self.codes.clone())
}

fn notify(_error: &Status, _duration: Duration) {
tracing::trace!("retry fn");
}
}

impl Default for RetrySetting {
Expand All @@ -92,7 +97,7 @@ where
RT: Retry<E, C> + Default,
{
let retry = retry.unwrap_or_default();
RetryIf::spawn(retry.strategy(), action, retry.condition()).await
RetryIf::spawn(retry.strategy(), action, retry.condition(), RT::notify).await
}
/// Repeats retries when the specified error is detected.
/// The argument specified by 'v' can be reused for each retry.
Expand All @@ -117,7 +122,6 @@ where
if retry.condition().should_retry(&status) {
let duration = strategy.next().ok_or(status)?;
tokio::time::sleep(duration).await;
tracing::trace!("retry fn");
} else {
return Err(status);
}
Expand All @@ -128,6 +132,7 @@ where
mod tests {
use std::sync::{Arc, Mutex};

use tokio_retry2::MapErr;
use tonic::{Code, Status};

use crate::retry::{invoke, RetrySetting};
Expand All @@ -140,7 +145,7 @@ mod tests {
let mut lock = counter.lock().unwrap();
*lock += 1;
let result: Result<i32, Status> = Err(Status::new(Code::Aborted, "error"));
result
result.map_transient_err()
};
let actual = invoke(Some(retry), action).await.unwrap_err();
let expected = Status::new(Code::Aborted, "error");
Expand Down
10 changes: 5 additions & 5 deletions foundation/longrunning/src/autogen/operations_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tonic::Response;
use google_cloud_gax::conn::{Channel, Error};
use google_cloud_gax::create_request;
use google_cloud_gax::grpc::{Code, Status};
use google_cloud_gax::retry::{invoke, RetrySetting};
use google_cloud_gax::retry::{invoke, MapErr, RetrySetting};
use google_cloud_googleapis::longrunning::operations_client::OperationsClient as InternalOperationsClient;
use google_cloud_googleapis::longrunning::{
CancelOperationRequest, DeleteOperationRequest, GetOperationRequest, Operation, WaitOperationRequest,
Expand Down Expand Up @@ -44,7 +44,7 @@ impl OperationsClient {
let name = &req.name;
let action = || async {
let request = create_request(format!("name={name}"), req.clone());
self.inner.clone().get_operation(request).await
self.inner.clone().get_operation(request).await.map_transient_err()
};
invoke(Some(setting), action).await
}
Expand All @@ -62,7 +62,7 @@ impl OperationsClient {
let name = &req.name;
let action = || async {
let request = create_request(format!("name={name}"), req.clone());
self.inner.clone().delete_operation(request).await
self.inner.clone().delete_operation(request).await.map_transient_err()
};
invoke(Some(setting), action).await
}
Expand All @@ -86,7 +86,7 @@ impl OperationsClient {
let name = &req.name;
let action = || async {
let request = create_request(format!("name={name}"), req.clone());
self.inner.clone().cancel_operation(request).await
self.inner.clone().cancel_operation(request).await.map_transient_err()
};
invoke(Some(setting), action).await
}
Expand All @@ -108,7 +108,7 @@ impl OperationsClient {
let setting = retry.unwrap_or_else(default_retry_setting);
let action = || async {
let request = create_request("".to_string(), req.clone());
self.inner.clone().wait_operation(request).await
self.inner.clone().wait_operation(request).await.map_transient_err()
};
invoke(Some(setting), action).await
}
Expand Down
48 changes: 30 additions & 18 deletions kms/src/grpc/apiv1/kms_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::grpc::apiv1::conn_pool::ConnectionManager;

use google_cloud_gax::create_request;
use google_cloud_gax::grpc::{Code, Status};
use google_cloud_gax::retry::{invoke, RetrySetting};
use google_cloud_gax::retry::{invoke, MapErr, RetrySetting};

use crate::grpc::kms::v1::CreateCryptoKeyVersionRequest;
use crate::grpc::kms::v1::CreateKeyRingRequest;
Expand Down Expand Up @@ -62,7 +62,7 @@ impl Client {
) -> Result<GenerateRandomBytesResponse, Status> {
let action = || async {
let request = create_request(format!("location={}", req.location), req.clone());
self.cm.conn().generate_random_bytes(request).await
self.cm.conn().generate_random_bytes(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -81,7 +81,7 @@ impl Client {
) -> Result<CryptoKey, Status> {
let action = || async {
let request = create_request(format!("parent={}", req.parent), req.clone());
self.cm.conn().create_crypto_key(request).await
self.cm.conn().create_crypto_key(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -100,7 +100,11 @@ impl Client {
) -> Result<CryptoKeyVersion, Status> {
let action = || async {
let request = create_request(format!("parent={}", req.parent), req.clone());
self.cm.conn().create_crypto_key_version(request).await
self.cm
.conn()
.create_crypto_key_version(request)
.await
.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -119,7 +123,7 @@ impl Client {
) -> Result<KeyRing, Status> {
let action = || async {
let request = create_request(format!("parent={}", req.parent), req.clone());
self.cm.conn().create_key_ring(request).await
self.cm.conn().create_key_ring(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -138,7 +142,11 @@ impl Client {
) -> Result<CryptoKeyVersion, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().destroy_crypto_key_version(request).await
self.cm
.conn()
.destroy_crypto_key_version(request)
.await
.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -157,7 +165,7 @@ impl Client {
) -> Result<CryptoKey, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().get_crypto_key(request).await
self.cm.conn().get_crypto_key(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -176,7 +184,7 @@ impl Client {
) -> Result<CryptoKeyVersion, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().get_crypto_key_version(request).await
self.cm.conn().get_crypto_key_version(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -191,7 +199,7 @@ impl Client {
pub async fn get_key_ring(&self, req: GetKeyRingRequest, retry: Option<RetrySetting>) -> Result<KeyRing, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().get_key_ring(request).await
self.cm.conn().get_key_ring(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -210,7 +218,11 @@ impl Client {
) -> Result<ListCryptoKeyVersionsResponse, Status> {
let action = || async {
let request = create_request(format!("parent={}", req.parent), req.clone());
self.cm.conn().list_crypto_key_versions(request).await
self.cm
.conn()
.list_crypto_key_versions(request)
.await
.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -229,7 +241,7 @@ impl Client {
) -> Result<ListCryptoKeysResponse, Status> {
let action = || async {
let request = create_request(format!("parent={}", req.parent), req.clone());
self.cm.conn().list_crypto_keys(request).await
self.cm.conn().list_crypto_keys(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -248,7 +260,7 @@ impl Client {
) -> Result<ListKeyRingsResponse, Status> {
let action = || async {
let request = create_request(format!("parent={}", req.parent), req.clone());
self.cm.conn().list_key_rings(request).await
self.cm.conn().list_key_rings(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -263,7 +275,7 @@ impl Client {
pub async fn encrypt(&self, req: EncryptRequest, retry: Option<RetrySetting>) -> Result<EncryptResponse, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().encrypt(request).await
self.cm.conn().encrypt(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -278,7 +290,7 @@ impl Client {
pub async fn decrypt(&self, req: DecryptRequest, retry: Option<RetrySetting>) -> Result<DecryptResponse, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().decrypt(request).await
self.cm.conn().decrypt(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -297,7 +309,7 @@ impl Client {
) -> Result<AsymmetricSignResponse, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().asymmetric_sign(request).await
self.cm.conn().asymmetric_sign(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -312,7 +324,7 @@ impl Client {
pub async fn mac_sign(&self, req: MacSignRequest, retry: Option<RetrySetting>) -> Result<MacSignResponse, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().mac_sign(request).await
self.cm.conn().mac_sign(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -331,7 +343,7 @@ impl Client {
) -> Result<MacVerifyResponse, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().mac_verify(request).await
self.cm.conn().mac_verify(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -350,7 +362,7 @@ impl Client {
) -> Result<PublicKey, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().get_public_key(request).await
self.cm.conn().get_public_key(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand Down
32 changes: 22 additions & 10 deletions pubsub/src/apiv1/publisher_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use google_cloud_gax::conn::Channel;
use google_cloud_gax::create_request;
use google_cloud_gax::grpc::Response;
use google_cloud_gax::grpc::{Code, Status};
use google_cloud_gax::retry::{invoke, RetrySetting};
use google_cloud_gax::retry::{invoke, MapErr, RetrySetting};
use google_cloud_googleapis::pubsub::v1::publisher_client::PublisherClient as InternalPublisherClient;
use google_cloud_googleapis::pubsub::v1::{
DeleteTopicRequest, DetachSubscriptionRequest, DetachSubscriptionResponse, GetTopicRequest,
Expand Down Expand Up @@ -41,7 +41,7 @@ impl PublisherClient {
let action = || async {
let mut client = self.client();
let request = create_request(format!("name={name}"), req.clone());
client.create_topic(request).await
client.create_topic(request).await.map_transient_err()
};
invoke(retry, action).await
}
Expand All @@ -61,7 +61,7 @@ impl PublisherClient {
let action = || async {
let mut client = self.client();
let request = create_request(format!("name={name}"), req.clone());
client.update_topic(request).await
client.update_topic(request).await.map_transient_err()
};
invoke(retry, action).await
}
Expand Down Expand Up @@ -92,7 +92,7 @@ impl PublisherClient {
let action = || async {
let mut client = self.client();
let request = create_request(format!("name={name}"), req.clone());
client.publish(request).await
client.publish(request).await.map_transient_err()
};
invoke(Some(setting), action).await
}
Expand All @@ -108,7 +108,7 @@ impl PublisherClient {
let action = || async {
let mut client = self.client();
let request = create_request(format!("topic={topic}"), req.clone());
client.get_topic(request).await
client.get_topic(request).await.map_transient_err()
};
invoke(retry, action).await
}
Expand All @@ -127,7 +127,11 @@ impl PublisherClient {
let action = || async {
let mut client = self.client();
let request = create_request(format!("project={project}"), req.clone());
client.list_topics(request).await.map(|d| d.into_inner())
client
.list_topics(request)
.await
.map(|d| d.into_inner())
.map_transient_err()
};
let response = invoke(retry.clone(), action).await?;
all.extend(response.topics.into_iter());
Expand All @@ -152,7 +156,11 @@ impl PublisherClient {
let action = || async {
let mut client = self.client();
let request = create_request(format!("topic={topic}"), req.clone());
client.list_topic_subscriptions(request).await.map(|d| d.into_inner())
client
.list_topic_subscriptions(request)
.await
.map(|d| d.into_inner())
.map_transient_err()
};
let response = invoke(retry.clone(), action).await?;
all.extend(response.subscriptions.into_iter());
Expand Down Expand Up @@ -181,7 +189,11 @@ impl PublisherClient {
let action = || async {
let mut client = self.client();
let request = create_request(format!("topic={topic}"), req.clone());
client.list_topic_snapshots(request).await.map(|d| d.into_inner())
client
.list_topic_snapshots(request)
.await
.map(|d| d.into_inner())
.map_transient_err()
};
let response = invoke(retry.clone(), action).await?;
all.extend(response.snapshots.into_iter());
Expand All @@ -207,7 +219,7 @@ impl PublisherClient {
let action = || async {
let mut client = self.client();
let request = create_request(format!("topic={topic}"), req.clone());
client.delete_topic(request).await
client.delete_topic(request).await.map_transient_err()
};
invoke(retry, action).await
}
Expand All @@ -226,7 +238,7 @@ impl PublisherClient {
let action = || async {
let mut client = self.client();
let request = create_request(format!("subscription={subscription}"), req.clone());
client.detach_subscription(request).await
client.detach_subscription(request).await.map_transient_err()
};
invoke(retry, action).await
}
Expand Down
Loading

0 comments on commit 5d0bb4d

Please sign in to comment.