Skip to content

Commit

Permalink
Tweak retry logic
Browse files Browse the repository at this point in the history
- Use exponential backoff.
- Make max retries configurable.
  • Loading branch information
emk committed Jun 29, 2022
1 parent e8b7082 commit 3ff51c8
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 12 deletions.
22 changes: 18 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ struct Opt {
#[structopt(long = "max-addresses-per-second")]
max_addresses_per_second: Option<usize>,

/// How many times should we retry a failed geocoding block? Each retry
/// takes twice as long as the last. The current default value will result
/// in giving up after about 30 seconds.
#[structopt(long = "max-retries", default_value = "4")]
max_retries: u8,

/// Labels to attach to reported metrics. Recommended: "source=$SOURCE".
#[structopt(long = "metrics-label", value_name = "KEY=VALUE")]
metrics_labels: Vec<MetricsLabel>,
Expand Down Expand Up @@ -188,10 +194,13 @@ async fn main() -> Result<()> {
Arc::new(
RateLimiter::builder()
.initial(max)
.max(max)
// The docs recommend twice our refill rate or our
// initial value, whichever is larger.
.max(2 * max)
.refill(limit)
.interval(Duration::from_secs(1))
// Since this is all the same geocoding job,
// Since this is all the same geocoding job, don't worry about
// fair scheduling between different worker tasks.
.fair(false)
.build(),
)
Expand Down Expand Up @@ -229,8 +238,13 @@ async fn main() -> Result<()> {
}

// Call our geocoder.
let result =
geocode_stdio(spec, Arc::from(geocoder), opt.on_duplicate_columns).await;
let result = geocode_stdio(
spec,
Arc::from(geocoder),
opt.on_duplicate_columns,
opt.max_retries,
)
.await;

// Report our metrics.
if let Err(err) = metrics_handle.report().await {
Expand Down
28 changes: 20 additions & 8 deletions src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub async fn geocode_stdio(
spec: AddressColumnSpec<String>,
geocoder: Arc<dyn Geocoder>,
on_duplicate_columns: OnDuplicateColumns,
max_retries: u8,
) -> Result<()> {
describe_counter!("geocodecsv.addresses.total", "Total addresses processed");
describe_counter!("geocodecsv.chunks.total", "Total address chunks processed");
Expand Down Expand Up @@ -106,7 +107,9 @@ pub async fn geocode_stdio(
let in_rx = ReceiverStream::new(in_rx);
let mut stream = in_rx
// Turn input messages into futures that yield output messages.
.map(move |message| geocode_message(geocoder.clone(), message).boxed())
.map(move |message| {
geocode_message(geocoder.clone(), message, max_retries).boxed()
})
// Turn output message futures into output messages in parallel.
.buffered(CONCURRENCY);

Expand Down Expand Up @@ -337,12 +340,13 @@ fn write_csv_to_stdout(rx: Receiver<Message>) -> Result<()> {
async fn geocode_message(
geocoder: Arc<dyn Geocoder>,
message: Message,
max_retries: u8,
) -> Result<Message> {
match message {
Message::Chunk(chunk) => {
trace!("geocoding {} rows", chunk.rows.len());
Ok(Message::Chunk(
geocode_chunk(geocoder.as_ref(), chunk).await?,
geocode_chunk(geocoder.as_ref(), chunk, max_retries).await?,
))
}
Message::EndOfStream => {
Expand All @@ -358,7 +362,11 @@ async fn geocode_message(
skip_all,
fields(rows = chunk.rows.len())
)]
async fn geocode_chunk(geocoder: &dyn Geocoder, mut chunk: Chunk) -> Result<Chunk> {
async fn geocode_chunk(
geocoder: &dyn Geocoder,
mut chunk: Chunk,
max_retries: u8,
) -> Result<Chunk> {
// Build a list of addresses to geocode.
let prefixes = chunk.shared.spec.prefixes();
let mut addresses = vec![];
Expand All @@ -375,20 +383,24 @@ async fn geocode_chunk(geocoder: &dyn Geocoder, mut chunk: Chunk) -> Result<Chun
let addresses_len = addresses.len();

// Geocode our addresses.
//
// TODO: Retry on failure.
trace!("geocoding {} addresses", addresses_len);
let mut failures: u8 = 0;
let mut retry_wait = Duration::from_secs(2);
let geocoded = loop {
// TODO: The `clone` here is expensive. We might want to move the
// `retry` loop inside of `street_addresses`.
let result = geocoder.geocode_addresses(&addresses).await;
match result {
Err(ref err) if failures < 5 => {
Err(ref err) if failures < max_retries => {
failures += 1;
debug!("retrying geocoder error: {:?}", err);
debug!(
"retrying geocoder error (waiting {} secs): {:?}",
retry_wait.as_secs(),
err
);
counter!("geocodecsv.chunks_retried.total", 1);
sleep(Duration::from_secs(2));
sleep(retry_wait);
retry_wait *= 2;
}
Err(err) => {
counter!("geocodecsv.chunks_failed.total", 1);
Expand Down

0 comments on commit 3ff51c8

Please sign in to comment.