From 2e9ae09daed82ce691355071c131999e0b6c3769 Mon Sep 17 00:00:00 2001 From: Eric Kidd Date: Thu, 11 May 2023 07:45:07 -0400 Subject: [PATCH] Skip geocoding for records with empty addresses Geocoding these addresses with Smarty now triggers a hard failure. --- src/addresses.rs | 6 +++ src/geocoders/cache/mod.rs | 2 +- src/geocoders/invalid_record_skipper.rs | 72 +++++++++++++++++++++++++ src/geocoders/mod.rs | 6 ++- src/main.rs | 16 +++--- tests/specs.rs | 38 +++++++++++++ 6 files changed, 131 insertions(+), 9 deletions(-) create mode 100644 src/geocoders/invalid_record_skipper.rs diff --git a/src/addresses.rs b/src/addresses.rs index 6535681..6bb79d1 100644 --- a/src/addresses.rs +++ b/src/addresses.rs @@ -27,6 +27,12 @@ pub struct Address { } impl Address { + /// A valid `Address` has a non-empty `street` field. (And whitespace + /// doesn't count as non-empty.) + pub fn is_valid(&self) -> bool { + !self.street.trim().is_empty() + } + /// The `city` field, or an empty string. pub fn city_str(&self) -> &str { self.city.as_ref().map(|s| &s[..]).unwrap_or("") diff --git a/src/geocoders/cache/mod.rs b/src/geocoders/cache/mod.rs index 23e855f..d9270d0 100644 --- a/src/geocoders/cache/mod.rs +++ b/src/geocoders/cache/mod.rs @@ -186,7 +186,7 @@ impl Geocoder for Cache { // Encode our value for caching. let value = retry.as_ref().map(|retry| &retry.column_values); encoded.clear(); - bincode::encode_into_std_write(&value, &mut encoded, bincode_config) + bincode::encode_into_std_write(value, &mut encoded, bincode_config) .context("could not encode value for caching")?; // Compress our encoded value and add it to our pipeline set. diff --git a/src/geocoders/invalid_record_skipper.rs b/src/geocoders/invalid_record_skipper.rs new file mode 100644 index 0000000..0b57a23 --- /dev/null +++ b/src/geocoders/invalid_record_skipper.rs @@ -0,0 +1,72 @@ +//! Skip invalid addresses and don't pass them through to the next layer. + +use async_trait::async_trait; +use metrics::{counter, describe_counter}; + +use crate::addresses::Address; + +use super::{Geocoded, Geocoder, Result}; + +/// Skip invalid addresses and don't pass them through to the next layer. +pub struct InvalidRecordSkipper { + // Our inner geocoder that we're normalizing for. + inner: Box, +} + +impl InvalidRecordSkipper { + /// Create a new `Normalizer` wrapping the specified geocoder. + pub fn new(inner: Box) -> InvalidRecordSkipper { + describe_counter!( + "geocodecsv.invalid_records.total", + "Invalid records which could not be geocoded" + ); + + InvalidRecordSkipper { inner } + } +} + +#[async_trait] +impl Geocoder for InvalidRecordSkipper { + fn tag(&self) -> &str { + // We don't change anything which could possibly affect caching, + // so we can just use our inner tag. + self.inner.tag() + } + + fn configuration_key(&self) -> &str { + self.inner.configuration_key() + } + + fn column_names(&self) -> &[String] { + self.inner.column_names() + } + + async fn geocode_addresses( + &self, + addresses: &[Address], + ) -> Result>> { + // Extract our valid addresses, keeping track of their original + // positions. + let mut original_indices = vec![]; + let mut valid_addresses = vec![]; + for (i, address) in addresses.iter().enumerate() { + if address.is_valid() { + valid_addresses.push(address.clone()); + original_indices.push(i); + } else { + counter!("geocodecsv.invalid_records.total", 1); + } + } + + // Geocode our valid addresses. + let geocodeded = self.inner.geocode_addresses(&valid_addresses).await?; + + // Rebuild our geocoded addresses, inserting `None` for invalid. + let mut result = vec![None; addresses.len()]; + for (i, geocoded) in geocodeded.into_iter().enumerate() { + let original_index = original_indices[i]; + result[original_index] = geocoded; + } + Ok(result) + } +} diff --git a/src/geocoders/mod.rs b/src/geocoders/mod.rs index 8c49930..3f73a3b 100644 --- a/src/geocoders/mod.rs +++ b/src/geocoders/mod.rs @@ -16,6 +16,7 @@ use crate::{ }; pub mod cache; +pub mod invalid_record_skipper; pub mod libpostal; pub mod normalizer; pub mod smarty; @@ -55,6 +56,9 @@ pub enum MatchStrategy { Enhanced, } +// `#derive(Default)` would actually do the right thing, but we want to make the +// default explicit for the reader. +#[allow(clippy::derivable_impls)] impl Default for MatchStrategy { fn default() -> Self { MatchStrategy::Strict @@ -122,7 +126,7 @@ pub trait Geocoder: Send + Sync + 'static { let mut hasher = Sha256::new(); for column_name in self.column_names() { hasher.update(column_name.as_bytes()); - hasher.update(&[0]); + hasher.update([0]); } hasher.update(self.configuration_key()); let hash = hasher.finalize(); diff --git a/src/main.rs b/src/main.rs index 5e8d59e..f68bbd6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,12 +5,6 @@ pub use anyhow::Result; use anyhow::{format_err, Error}; -use geocoders::cache::Cache; -use geocoders::libpostal::LibPostal; -use geocoders::normalizer::Normalizer; -use geocoders::smarty::Smarty; -use geocoders::{shared_http_client, Geocoder}; -use key_value_stores::KeyValueStore; use leaky_bucket::RateLimiter; use metrics::describe_counter; use opinionated_metrics::Mode; @@ -37,7 +31,12 @@ mod pipeline; mod unpack_vec; use crate::addresses::AddressColumnSpec; -use crate::geocoders::MatchStrategy; +use crate::geocoders::{ + cache::Cache, invalid_record_skipper::InvalidRecordSkipper, libpostal::LibPostal, + normalizer::Normalizer, shared_http_client, smarty::Smarty, Geocoder, + MatchStrategy, +}; +use crate::key_value_stores::KeyValueStore; use crate::pipeline::{geocode_stdio, OnDuplicateColumns, CONCURRENCY, GEOCODE_SIZE}; /// Underlying geocoders we can use. (Helper struct for argument parsing.) @@ -237,6 +236,9 @@ async fn main() -> Result<()> { geocoder = Box::new(Normalizer::new(geocoder)); } + // Always skip invalid records. + geocoder = Box::new(InvalidRecordSkipper::new(geocoder)); + // Call our geocoder. let result = geocode_stdio( spec, diff --git a/tests/specs.rs b/tests/specs.rs index cf764ea..435cb2d 100644 --- a/tests/specs.rs +++ b/tests/specs.rs @@ -217,3 +217,41 @@ fn rate_limiter() { .expect_success(); assert!(output.stdout_str().contains("shipping_addressee")); } + +#[test] +#[ignore] +fn skip_records_with_empty_house_number_and_street() { + let testdir = TestDir::new( + "geocode-csv", + "skip_records_with_empty_house_number_and_street", + ); + + testdir.create_file( + "spec.json", + r#"{ + "shipping": { + "house_number_and_street": [ + "address_1", + "address_2" + ], + "postcode": "zip_code" + } +}"#, + ); + + let output = testdir + .cmd() + .arg("--license=us-core-enterprise-cloud") + .arg("--spec=spec.json") + .output_with_stdin( + r#"address_1,address_2,city,state,zip_code +,,New York,NY,10118 + , ,Provo,UT, +"#, + ) + .expect_success(); + // We output all lines, without geocoding any that lack a street address. + assert!(output.stdout_str().contains("shipping_addressee")); + assert!(output.stdout_str().contains("New York")); + assert!(output.stdout_str().contains("Provo")); +}