Skip to content

Commit

Permalink
feat: adding a remote-settings CLI download feature, sync with local …
Browse files Browse the repository at this point in the history
…data

Adding two commands to the cargo remote-settings CLI, `dump-sync` and `dump-get`. This allows
to download a local dump of a set of collections, and keep it up to date with the remote version.

It's also possible to open a PR right away to update this file in the app-services repo.

`cargo remote-settings dump-sync --create-pr` will create a local branch and push it to the repo.

When trying to `get_records` from the component, it first checks if the database has some,
if not, it checks if the collection exists and takes it from the local file.
  • Loading branch information
gruberb committed Nov 19, 2024
1 parent 8cc7b4b commit 91c55bf
Show file tree
Hide file tree
Showing 12 changed files with 2,106 additions and 84 deletions.
456 changes: 392 additions & 64 deletions Cargo.lock

Large diffs are not rendered by default.

820 changes: 820 additions & 0 deletions components/remote_settings/dumps/main/search-telemetry-v2.json

Large diffs are not rendered by default.

421 changes: 405 additions & 16 deletions components/remote_settings/src/client.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion components/remote_settings/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl RemoteSettingsServer {
///
/// The difference is that it uses `Error` instead of `ApiError`. This is what we need to use
/// inside the crate.
pub(crate) fn get_url(&self) -> Result<Url> {
pub fn get_url(&self) -> Result<Url> {
Ok(match self {
Self::Prod => Url::parse("https://firefox.settings.services.mozilla.com/v1")?,
Self::Stage => Url::parse("https://firefox.settings.services.allizom.org/v1")?,
Expand Down
1 change: 1 addition & 0 deletions components/remote_settings/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl RemoteSettingsService {
) -> Result<Arc<RemoteSettingsClient>> {
let mut inner = self.inner.lock();
let storage = Storage::new(inner.storage_dir.join(format!("{collection_name}.sql")))?;

let client = Arc::new(RemoteSettingsClient::new(
inner.base_url.clone(),
inner.bucket_name.clone(),
Expand Down
4 changes: 2 additions & 2 deletions components/remote_settings/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl Storage {
pub fn set_records(
&mut self,
collection_url: &str,
records: &[RemoteSettingsRecord],
records: &Vec<RemoteSettingsRecord>,
) -> Result<()> {
let tx = self.conn.transaction()?;

Expand Down Expand Up @@ -282,7 +282,7 @@ mod tests {
let collection_url = "https://example.com/api";

// Set empty records
storage.set_records(collection_url, &[])?;
storage.set_records(collection_url, &Vec::<RemoteSettingsRecord>::default())?;

// Get records
let fetched_records = storage.get_records(collection_url)?;
Expand Down
12 changes: 12 additions & 0 deletions examples/remote-settings-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,22 @@ license = "MPL-2.0"
edition = "2021"
publish = false

[lib]
name = "dump"
path = "src/dump/lib.rs"

[dependencies]
remote_settings = { path = "../../components/remote_settings" }
viaduct-reqwest = { path = "../../components/support/viaduct-reqwest" }
log = "0.4"
clap = {version = "4.2", features = ["derive"]}
anyhow = "1.0"
env_logger = { version = "0.10", default-features = false, features = ["humantime"] }
reqwest = { version = "0.12", features = ["json"] }
serde_json = "1"
futures = "0.3"
indicatif = "0.17"
tokio = { version = "1.29.1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
thiserror = "1.0.31"
walkdir = "2.4.0"
332 changes: 332 additions & 0 deletions examples/remote-settings-cli/src/dump/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
use crate::error::*;
use futures::{stream::FuturesUnordered, StreamExt};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use serde::de::Error;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::{path::PathBuf, sync::Arc};
use walkdir::WalkDir;

const DUMPS_DIR: &str = "dumps";

pub struct CollectionDownloader {
client: reqwest::Client,
multi_progress: Arc<MultiProgress>,
output_dir: PathBuf,
}

#[derive(Deserialize, Serialize)]
pub struct CollectionData {
data: Vec<Value>,
timestamp: u64,
}

pub struct UpdateResult {
updated: Vec<String>,
up_to_date: Vec<String>,
not_found: Vec<String>,
}

impl CollectionDownloader {
pub fn new(root_path: PathBuf) -> Self {
let output_dir = if root_path.ends_with("components/remote_settings") {
root_path
} else {
root_path.join("components").join("remote_settings")
};

Self {
client: reqwest::Client::new(),
multi_progress: Arc::new(MultiProgress::new()),
output_dir,
}
}

pub async fn run(&self, dry_run: bool, create_pr: bool) -> Result<()> {
if dry_run && create_pr {
return Err(RemoteSettingsError::Git(
"Cannot use --dry-run with --create-pr".to_string(),
)
.into());
}

let result = self.download_all().await?;

if dry_run {
println!("\nDry run summary:");
println!("- Would update {} collections", result.updated.len());
println!(
"- {} collections already up to date",
result.up_to_date.len()
);
println!(
"- {} collections not found on remote",
result.not_found.len()
);
return Ok(());
}

println!("\nExecution summary:");
if !result.updated.is_empty() {
println!("Updated collections:");
for collection in &result.updated {
println!(" - {}", collection);
}
}

if !result.up_to_date.is_empty() {
println!("Collections already up to date:");
for collection in &result.up_to_date {
println!(" - {}", collection);
}
}

if !result.not_found.is_empty() {
println!("Collections not found on remote:");
for collection in &result.not_found {
println!(" - {}", collection);
}
}

if !result.updated.is_empty() && create_pr {
self.create_pull_request()?;
}

Ok(())
}

fn create_pull_request(&self) -> Result<()> {
let git_ops = crate::git::GitOps::new(
self.output_dir
.parent()
.unwrap()
.parent()
.unwrap()
.to_path_buf(),
);

let branch_name = "remote-settings-update-dumps";

git_ops.create_branch(branch_name)?;
git_ops.commit_changes()?;
git_ops.push_branch(branch_name)?;
Ok(())
}

fn scan_local_dumps(&self) -> Result<HashMap<String, (String, u64)>> {
let mut collections = HashMap::new();
let dumps_dir = self.output_dir.join(DUMPS_DIR);

for entry in WalkDir::new(dumps_dir).min_depth(2).max_depth(2) {
let entry = entry?;
if entry.file_type().is_file()
&& entry.path().extension().map_or(false, |ext| ext == "json")
{
// Get bucket name from parent directory
let bucket = entry
.path()
.parent()
.and_then(|p| p.file_name())
.and_then(|n| n.to_str())
.ok_or_else(|| RemoteSettingsError::Path("Invalid bucket path".into()))?;

// Get collection name from filename
let collection_name = entry
.path()
.file_stem()
.and_then(|n| n.to_str())
.ok_or_else(|| RemoteSettingsError::Path("Invalid collection name".into()))?;

// Read and parse the file to get timestamp
let content = std::fs::read_to_string(entry.path())?;
let data: serde_json::Value = serde_json::from_str(&content)?;
let timestamp = data["timestamp"].as_u64().ok_or_else(|| {
RemoteSettingsError::Json(serde_json::Error::custom("No timestamp found"))
})?;

collections.insert(
format!("{}/{}", bucket, collection_name),
(bucket.to_string(), timestamp),
);
}
}
Ok(collections)
}

async fn fetch_timestamps(&self) -> Result<HashMap<String, u64>> {
let monitor_url = format!(
"{}/buckets/monitor/collections/changes/records",
"https://firefox.settings.services.mozilla.com/v1"
);
let monitor_response: Value = self.client.get(&monitor_url).send().await?.json().await?;

Ok(monitor_response["data"]
.as_array()
.ok_or_else(|| {
RemoteSettingsError::Json(serde_json::Error::custom(
"No data array in monitor response",
))
})?
.iter()
.filter_map(|record| {
let bucket = record["bucket"].as_str()?;
let collection_name = record["collection"].as_str()?;
Some((
format!("{}/{}", bucket, collection_name),
record["last_modified"].as_u64()?,
))
})
.collect())
}

async fn fetch_collection(
&self,
collection_name: String,
last_modified: u64,
pb: ProgressBar,
) -> Result<(String, CollectionData)> {
let parts: Vec<&str> = collection_name.split('/').collect();
if parts.len() != 2 {
return Err(RemoteSettingsError::Json(serde_json::Error::custom(
"Invalid collection name format",
))
.into());
}
let (bucket, name) = (parts[0], parts[1]);

let url = format!(
"{}/buckets/{}/collections/{}/changeset?_expected={}",
"https://firefox.settings.services.mozilla.com/v1", bucket, name, last_modified
);

pb.set_message(format!("Downloading {}", name));

let response = self.client.get(&url).send().await?;
let changeset: Value = response.json().await?;

let timestamp = changeset["timestamp"].as_u64().ok_or_else(|| {
RemoteSettingsError::Json(serde_json::Error::custom("No timestamp in changeset"))
})?;

pb.finish_with_message(format!("Downloaded {}", name));

Ok((
collection_name,
CollectionData {
data: changeset["changes"]
.as_array()
.unwrap_or(&Vec::new())
.to_vec(),
timestamp,
},
))
}

pub async fn download_all(&self) -> Result<UpdateResult> {
std::fs::create_dir_all(self.output_dir.join(DUMPS_DIR))?;

let local_collections = self.scan_local_dumps()?;
if local_collections.is_empty() {
println!(
"No local collections found in {:?}",
self.output_dir.join(DUMPS_DIR)
);
return Ok(UpdateResult {
updated: vec![],
up_to_date: vec![],
not_found: vec![],
});
}

let remote_timestamps = self.fetch_timestamps().await?;
let mut futures = FuturesUnordered::new();
let mut up_to_date = Vec::new();
let mut not_found = Vec::new();

// Only check collections we have locally
for (collection_key, (_, local_timestamp)) in local_collections {
let remote_timestamp = match remote_timestamps.get(&collection_key) {
Some(&timestamp) => timestamp,
None => {
println!("Warning: Collection {} not found on remote", collection_key);
not_found.push(collection_key);
continue;
}
};

let pb = self.multi_progress.add(ProgressBar::new(100));
pb.set_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
.unwrap(),
);

if local_timestamp >= remote_timestamp {
println!("Collection {} is up to date", collection_key);
up_to_date.push(collection_key);
continue;
}

println!("Collection {} needs update", collection_key);
futures.push(self.fetch_collection(collection_key.clone(), remote_timestamp, pb));
}

let mut updated = Vec::new();
while let Some(result) = futures.next().await {
let (collection, data) = result?;
self.write_collection_file(&collection, &data)?;
updated.push(collection);
}

Ok(UpdateResult {
updated,
up_to_date,
not_found,
})
}

pub async fn download_single(&self, bucket: &str, collection_name: &str) -> Result<()> {
std::fs::create_dir_all(self.output_dir.join(DUMPS_DIR))?;

let collection_key = format!("{}/{}", bucket, collection_name);
let pb = self.multi_progress.add(ProgressBar::new(100));
pb.set_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
.unwrap(),
);

let (_, data) = self.fetch_collection(collection_key.clone(), 0, pb).await?;

// Write to file
self.write_collection_file(&collection_key, &data)?;

println!(
"Successfully downloaded collection to {:?}/dumps/{}/{}.json",
self.output_dir, bucket, collection_name
);

Ok(())
}

fn write_collection_file(&self, collection: &str, data: &CollectionData) -> Result<()> {
let parts: Vec<&str> = collection.split('/').collect();
if parts.len() != 2 {
return Err(RemoteSettingsError::Path("Invalid collection path".into()).into());
}
let (bucket, name) = (parts[0], parts[1]);

// Write to dumps directory
let dumps_path = self
.output_dir
.join(DUMPS_DIR)
.join(bucket)
.join(format!("{}.json", name));

std::fs::create_dir_all(dumps_path.parent().unwrap())?;
std::fs::write(&dumps_path, serde_json::to_string_pretty(&data)?)?;

Ok(())
}
}
Loading

0 comments on commit 91c55bf

Please sign in to comment.