diff --git a/src/main.rs b/src/main.rs index 251b945..5e8d59e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -135,6 +135,12 @@ struct Opt { #[structopt(long = "max-addresses-per-second")] max_addresses_per_second: Option, + /// How many times should we retry a failed geocoding block? Each retry + /// takes twice as long as the last. The current default value will result + /// in giving up after about 30 seconds. + #[structopt(long = "max-retries", default_value = "4")] + max_retries: u8, + /// Labels to attach to reported metrics. Recommended: "source=$SOURCE". #[structopt(long = "metrics-label", value_name = "KEY=VALUE")] metrics_labels: Vec, @@ -188,10 +194,13 @@ async fn main() -> Result<()> { Arc::new( RateLimiter::builder() .initial(max) - .max(max) + // The docs recommend twice our refill rate or our + // initial value, whichever is larger. + .max(2 * max) .refill(limit) .interval(Duration::from_secs(1)) - // Since this is all the same geocoding job, + // Since this is all the same geocoding job, don't worry about + // fair scheduling between different worker tasks. .fair(false) .build(), ) @@ -229,8 +238,13 @@ async fn main() -> Result<()> { } // Call our geocoder. - let result = - geocode_stdio(spec, Arc::from(geocoder), opt.on_duplicate_columns).await; + let result = geocode_stdio( + spec, + Arc::from(geocoder), + opt.on_duplicate_columns, + opt.max_retries, + ) + .await; // Report our metrics. if let Err(err) = metrics_handle.report().await { diff --git a/src/pipeline.rs b/src/pipeline.rs index 74f9595..bf6ca25 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -72,6 +72,7 @@ pub async fn geocode_stdio( spec: AddressColumnSpec, geocoder: Arc, on_duplicate_columns: OnDuplicateColumns, + max_retries: u8, ) -> Result<()> { describe_counter!("geocodecsv.addresses.total", "Total addresses processed"); describe_counter!("geocodecsv.chunks.total", "Total address chunks processed"); @@ -106,7 +107,9 @@ pub async fn geocode_stdio( let in_rx = ReceiverStream::new(in_rx); let mut stream = in_rx // Turn input messages into futures that yield output messages. - .map(move |message| geocode_message(geocoder.clone(), message).boxed()) + .map(move |message| { + geocode_message(geocoder.clone(), message, max_retries).boxed() + }) // Turn output message futures into output messages in parallel. .buffered(CONCURRENCY); @@ -337,12 +340,13 @@ fn write_csv_to_stdout(rx: Receiver) -> Result<()> { async fn geocode_message( geocoder: Arc, message: Message, + max_retries: u8, ) -> Result { match message { Message::Chunk(chunk) => { trace!("geocoding {} rows", chunk.rows.len()); Ok(Message::Chunk( - geocode_chunk(geocoder.as_ref(), chunk).await?, + geocode_chunk(geocoder.as_ref(), chunk, max_retries).await?, )) } Message::EndOfStream => { @@ -358,7 +362,11 @@ async fn geocode_message( skip_all, fields(rows = chunk.rows.len()) )] -async fn geocode_chunk(geocoder: &dyn Geocoder, mut chunk: Chunk) -> Result { +async fn geocode_chunk( + geocoder: &dyn Geocoder, + mut chunk: Chunk, + max_retries: u8, +) -> Result { // Build a list of addresses to geocode. let prefixes = chunk.shared.spec.prefixes(); let mut addresses = vec![]; @@ -375,20 +383,24 @@ async fn geocode_chunk(geocoder: &dyn Geocoder, mut chunk: Chunk) -> Result { + Err(ref err) if failures < max_retries => { failures += 1; - debug!("retrying geocoder error: {:?}", err); + debug!( + "retrying geocoder error (waiting {} secs): {:?}", + retry_wait.as_secs(), + err + ); counter!("geocodecsv.chunks_retried.total", 1); - sleep(Duration::from_secs(2)); + sleep(retry_wait); + retry_wait *= 2; } Err(err) => { counter!("geocodecsv.chunks_failed.total", 1);