From 859b60eb43fcde1a0ebbcda401eb533816d10f31 Mon Sep 17 00:00:00 2001 From: Alexander Tesfamichael Date: Mon, 16 Oct 2023 12:49:28 +0200 Subject: [PATCH] feat(migrate-from-gcs): exp backoff for storing --- Cargo.lock | 25 +++++++++++++++++++++++++ Cargo.toml | 2 ++ src/bin/migrate-from-gcs.rs | 17 +++++++++++++++-- 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ff76b6d..6526da9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -67,6 +67,20 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom", + "instant", + "pin-project-lite", + "rand", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -490,6 +504,15 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + [[package]] name = "ipnet" version = "2.8.0" @@ -568,6 +591,8 @@ name = "migrate-payload-archive" version = "0.2.0" dependencies = [ "anyhow", + "backoff", + "bytes", "chrono", "csv", "flate2", diff --git a/Cargo.toml b/Cargo.toml index e571bc7..9abc33a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,8 @@ anyhow = { version = "1.0.75", features = [ "backtrace", "std", ], default-features = false } +backoff = { version = "0.4.0", default-features = false, features = ["tokio"] } +bytes = "1.5.0" chrono = { version = "0.4.31", default-features = false, features = ["std"] } csv = { version = "1.3.0", default-features = false } flate2 = { version = "1.0.28", default-features = false, features = [ diff --git a/src/bin/migrate-from-gcs.rs b/src/bin/migrate-from-gcs.rs index 0057005..962c611 100644 --- a/src/bin/migrate-from-gcs.rs +++ b/src/bin/migrate-from-gcs.rs @@ -8,6 +8,8 @@ use std::{ time::{Duration, SystemTime}, }; +use backoff::{backoff::Backoff, ExponentialBackoff}; +use bytes::Bytes; use chrono::{DateTime, Datelike, Timelike, Utc}; use flate2::{read::GzDecoder, write::GzEncoder, Compression}; use futures::{channel::mpsc::channel, FutureExt, SinkExt, StreamExt, TryStreamExt}; @@ -248,7 +250,7 @@ async fn main() -> anyhow::Result<()> { } }); - const CONCURRENT_PUT_LIMIT: usize = 16; + const CONCURRENT_PUT_LIMIT: usize = 8; slot_bundle_rx .map(Ok) @@ -282,7 +284,18 @@ async fn main() -> anyhow::Result<()> { .await .unwrap(); - ovh.put(&path, bytes_gz.into()).await.unwrap(); + + let mut backoff = ExponentialBackoff::default(); + let bytes_gz_shared = Bytes::from(bytes_gz); + while let Err(err) = ovh.put(&path, bytes_gz_shared.clone()).await { + if let Some(wait) = backoff.next_backoff() { + tokio::time::sleep(wait).await; + continue; + } + eprintln!("failed to execute OVH put operation: {}", err); + break; + } + let payloads_migrated_count = payloads_migrated_counter.fetch_add(payloads_count, std::sync::atomic::Ordering::Relaxed);