From d305778af1f2fccfeec52407655159806f290659 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Thu, 5 Dec 2024 17:51:44 +0100 Subject: [PATCH] Remote Settings: store full changesets instead of just records --- components/remote_settings/src/client.rs | 150 ++++++++++++---- components/remote_settings/src/schema.rs | 15 +- components/remote_settings/src/storage.rs | 200 +++++++++++++++++----- 3 files changed, 281 insertions(+), 84 deletions(-) diff --git a/components/remote_settings/src/client.rs b/components/remote_settings/src/client.rs index 2226649042..a1b8e0669a 100644 --- a/components/remote_settings/src/client.rs +++ b/components/remote_settings/src/client.rs @@ -151,9 +151,15 @@ impl RemoteSettingsClient { .get_last_modified_timestamp(&collection_url)? .unwrap_or(0); if packaged_data.timestamp > cached_timestamp { - inner - .storage - .set_records(&collection_url, &packaged_data.data)?; + // Remove previously cached data (packaged data does not have tombstones like diff responses do). + inner.storage.empty()?; + // Insert new packaged data. + inner.storage.insert_collection_content( + &collection_url, + &packaged_data.data, + packaged_data.timestamp, + CollectionMetadata::default(), + )?; return Ok(Some(self.filter_records(packaged_data.data))); } } @@ -168,9 +174,14 @@ impl RemoteSettingsClient { (Some(cached_records), _) => Some(self.filter_records(cached_records)), // Case 3: sync_if_empty=true (None, true) => { - let records = inner.api_client.get_records(None)?; - inner.storage.set_records(&collection_url, &records)?; - Some(self.filter_records(records)) + let changeset = inner.api_client.fetch_changeset(None)?; + inner.storage.insert_collection_content( + &collection_url, + &changeset.changes, + changeset.timestamp, + changeset.metadata, + )?; + Some(self.filter_records(changeset.changes)) } // Case 4: Nothing to return (None, false) => None, @@ -181,8 +192,13 @@ impl RemoteSettingsClient { let mut inner = self.inner.lock(); let collection_url = inner.api_client.collection_url(); let mtime = inner.storage.get_last_modified_timestamp(&collection_url)?; - let records = inner.api_client.get_records(mtime)?; - inner.storage.merge_records(&collection_url, &records) + let changeset = inner.api_client.fetch_changeset(mtime)?; + inner.storage.insert_collection_content( + &collection_url, + &changeset.changes, + changeset.timestamp, + changeset.metadata, + ) } /// Downloads an attachment from [attachment_location]. NOTE: there are no guarantees about a @@ -221,7 +237,7 @@ impl RemoteSettingsClient { } // Try to download the attachment because neither the storage nor the local data had it - let attachment = inner.api_client.get_attachment(&metadata.location)?; + let attachment = inner.api_client.fetch_attachment(&metadata.location)?; // Verify downloaded data if attachment.len() as u64 != metadata.size { @@ -284,10 +300,10 @@ pub trait ApiClient { fn collection_url(&self) -> String; /// Fetch records from the server - fn get_records(&mut self, timestamp: Option) -> Result>; + fn fetch_changeset(&mut self, timestamp: Option) -> Result; /// Fetch an attachment from the server - fn get_attachment(&mut self, attachment_location: &str) -> Result>; + fn fetch_attachment(&mut self, attachment_location: &str) -> Result>; /// Check if this client is pointing to the production server fn is_prod_server(&self) -> Result; @@ -372,7 +388,7 @@ impl ApiClient for ViaductApiClient { self.endpoints.collection_url.to_string() } - fn get_records(&mut self, timestamp: Option) -> Result> { + fn fetch_changeset(&mut self, timestamp: Option) -> Result { let mut url = self.endpoints.changeset_url.clone(); // 0 is used as an arbitrary value for `_expected` because the current implementation does // not leverage push timestamps or polling from the monitor/changes endpoint. More @@ -388,7 +404,7 @@ impl ApiClient for ViaductApiClient { let resp = self.make_request(url)?; if resp.is_success() { - Ok(resp.json::()?.changes) + Ok(resp.json::()?) } else { Err(Error::ResponseError(format!( "status code: {}", @@ -397,7 +413,7 @@ impl ApiClient for ViaductApiClient { } } - fn get_attachment(&mut self, attachment_location: &str) -> Result> { + fn fetch_attachment(&mut self, attachment_location: &str) -> Result> { let attachments_base_url = match &self.remote_state.attachments_base_url { Some(attachments_base_url) => attachments_base_url.to_owned(), None => { @@ -706,9 +722,24 @@ struct RecordsResponse { data: Vec, } -#[derive(Deserialize, Serialize)] -struct ChangesetResponse { +#[derive(Clone, Deserialize, Serialize)] +pub struct ChangesetResponse { changes: Vec, + timestamp: u64, + metadata: CollectionMetadata, +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] +pub struct CollectionMetadata { + pub bucket: String, + pub signature: CollectionSignature, +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] +pub struct CollectionSignature { + pub signature: String, + /// X.509 certificate chain Url (x5u) + pub x5u: String, } /// A parsed Remote Settings record. Records can contain arbitrary fields, so clients @@ -717,6 +748,7 @@ struct ChangesetResponse { pub struct RemoteSettingsRecord { pub id: String, pub last_modified: u64, + /// Tombstone flag (see https://remote-settings.readthedocs.io/en/latest/client-specifications.html#local-state) #[serde(default)] pub deleted: bool, pub attachment: Option, @@ -1705,14 +1737,24 @@ mod test_new_client { attachment: None, fields: json!({"foo": "bar"}).as_object().unwrap().clone(), }]; + let changeset = ChangesetResponse { + changes: records.clone(), + timestamp: 42, + metadata: CollectionMetadata { + bucket: "main".into(), + signature: CollectionSignature { + signature: "b64sig".into(), + x5u: "http://x5u.com".into(), + }, + }, + }; api_client.expect_collection_url().returning(|| { "http://rs.example.com/v1/buckets/main/collections/test-collection".into() }); - api_client.expect_get_records().returning({ - let records = records.clone(); + api_client.expect_fetch_changeset().returning({ move |timestamp| { assert_eq!(timestamp, None); - Ok(records.clone()) + Ok(changeset.clone()) } }); api_client.expect_is_prod_server().returning(|| Ok(false)); @@ -1748,14 +1790,19 @@ mod jexl_tests { .unwrap() .clone(), }]; + let changeset = ChangesetResponse { + changes: records.clone(), + timestamp: 42, + metadata: CollectionMetadata::default(), + }; api_client.expect_collection_url().returning(|| { "http://rs.example.com/v1/buckets/main/collections/test-collection".into() }); - api_client.expect_get_records().returning({ - let records = records.clone(); + api_client.expect_fetch_changeset().returning({ + let changeset = changeset.clone(); move |timestamp| { assert_eq!(timestamp, None); - Ok(records.clone()) + Ok(changeset.clone()) } }); api_client.expect_is_prod_server().returning(|| Ok(false)); @@ -1766,9 +1813,11 @@ mod jexl_tests { }; let mut storage = Storage::new(":memory:".into()).expect("Error creating storage"); - let _ = storage.set_records( + let _ = storage.insert_collection_content( "http://rs.example.com/v1/buckets/main/collections/test-collection", &records, + 42, + CollectionMetadata::default(), ); let rs_client = RemoteSettingsClient::new_from_parts( @@ -1799,14 +1848,19 @@ mod jexl_tests { .unwrap() .clone(), }]; + let changeset = ChangesetResponse { + changes: records.clone(), + timestamp: 42, + metadata: CollectionMetadata::default(), + }; api_client.expect_collection_url().returning(|| { "http://rs.example.com/v1/buckets/main/collections/test-collection".into() }); - api_client.expect_get_records().returning({ - let records = records.clone(); + api_client.expect_fetch_changeset().returning({ + let changeset = changeset.clone(); move |timestamp| { assert_eq!(timestamp, None); - Ok(records.clone()) + Ok(changeset.clone()) } }); api_client.expect_is_prod_server().returning(|| Ok(false)); @@ -1817,9 +1871,11 @@ mod jexl_tests { }; let mut storage = Storage::new(":memory:".into()).expect("Error creating storage"); - let _ = storage.set_records( + let _ = storage.insert_collection_content( "http://rs.example.com/v1/buckets/main/collections/test-collection", &records, + 42, + CollectionMetadata::default(), ); let rs_client = RemoteSettingsClient::new_from_parts( @@ -1902,7 +1958,12 @@ mod cached_data_tests { let mut api_client = MockApiClient::new(); let mut storage = Storage::new(":memory:".into())?; - storage.set_records(collection_url, &vec![old_record.clone()])?; + storage.insert_collection_content( + collection_url, + &vec![old_record.clone()], + 42, + CollectionMetadata::default(), + )?; api_client .expect_collection_url() @@ -1962,10 +2023,21 @@ mod cached_data_tests { attachment: None, fields: serde_json::Map::new(), }]; + let changeset = ChangesetResponse { + changes: expected_records.clone(), + timestamp: 42, + metadata: CollectionMetadata { + bucket: "main".into(), + signature: CollectionSignature { + signature: "b64sig".into(), + x5u: "http://x5u.com".into(), + }, + }, + }; api_client - .expect_get_records() + .expect_fetch_changeset() .withf(|timestamp| timestamp.is_none()) - .returning(move |_| Ok(expected_records.clone())); + .returning(move |_| Ok(changeset.clone())); let rs_client = RemoteSettingsClient::new_from_parts(collection_name.to_string(), storage, api_client); @@ -2012,7 +2084,7 @@ mod cached_data_tests { api_client.expect_is_prod_server().returning(|| Ok(true)); // Since sync_if_empty is false, get_records should not be called - // No need to set expectation for api_client.get_records + // No need to set expectation for api_client.fetch_changeset let rs_client = RemoteSettingsClient::new_from_parts(collection_name.to_string(), storage, api_client); @@ -2046,7 +2118,12 @@ mod cached_data_tests { attachment: None, fields: serde_json::Map::new(), }]; - storage.set_records(&collection_url, &cached_records)?; + storage.insert_collection_content( + &collection_url, + &cached_records, + 42, + CollectionMetadata::default(), + )?; api_client .expect_collection_url() @@ -2082,7 +2159,12 @@ mod cached_data_tests { // Set up empty cached records let cached_records: Vec = vec![]; - storage.set_records(&collection_url, &cached_records)?; + storage.insert_collection_content( + &collection_url, + &cached_records, + 42, + CollectionMetadata::default(), + )?; api_client .expect_collection_url() @@ -2210,7 +2292,7 @@ mod test_packaged_metadata { .returning(move || collection_url.clone()); api_client.expect_is_prod_server().returning(|| Ok(true)); api_client - .expect_get_attachment() + .expect_fetch_attachment() .returning(move |_| Ok(mock_api_data.clone())); let rs_client = diff --git a/components/remote_settings/src/schema.rs b/components/remote_settings/src/schema.rs index ec48361b38..d278d62336 100644 --- a/components/remote_settings/src/schema.rs +++ b/components/remote_settings/src/schema.rs @@ -13,7 +13,7 @@ use sql_support::open_database::{self, ConnectionInitializer}; /// 1. Bump this version. /// 2. Add a migration from the old version to the new version in /// [`RemoteSettingsConnectionInitializer::upgrade_from`]. -pub const VERSION: u32 = 1; +pub const VERSION: u32 = 2; /// The current remote settings database schema. pub const SQL: &str = r#" @@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS attachments ( data BLOB NOT NULL); CREATE TABLE IF NOT EXISTS collection_metadata ( collection_url TEXT PRIMARY KEY, - last_modified INTEGER); + last_modified INTEGER, bucket TEXT, signature TEXT, x5u TEXT); "#; /// Initializes an SQLite connection to the Remote Settings database, performing @@ -37,7 +37,7 @@ pub struct RemoteSettingsConnectionInitializer; impl ConnectionInitializer for RemoteSettingsConnectionInitializer { const NAME: &'static str = "remote_settings"; - const END_VERSION: u32 = 1; + const END_VERSION: u32 = 2; fn prepare(&self, conn: &Connection, _db_empty: bool) -> open_database::Result<()> { let initial_pragmas = " @@ -63,6 +63,15 @@ impl ConnectionInitializer for RemoteSettingsConnectionInitializer { tx.execute("ALTER TABLE collection_metadata DROP column fetched", ())?; Ok(()) } + 1 => { + tx.execute("ALTER TABLE collection_metadata ADD COLUMN bucket TEXT", ())?; + tx.execute( + "ALTER TABLE collection_metadata ADD COLUMN signature TEXT", + (), + )?; + tx.execute("ALTER TABLE collection_metadata ADD COLUMN x5u TEXT", ())?; + Ok(()) + } _ => Err(open_database::Error::IncompatibleVersion(version)), } } diff --git a/components/remote_settings/src/storage.rs b/components/remote_settings/src/storage.rs index 17cffca8e4..b27fb691e7 100644 --- a/components/remote_settings/src/storage.rs +++ b/components/remote_settings/src/storage.rs @@ -2,6 +2,10 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +use crate::{ + client::CollectionMetadata, client::CollectionSignature, + schema::RemoteSettingsConnectionInitializer, Attachment, RemoteSettingsRecord, Result, +}; use camino::Utf8PathBuf; use rusqlite::{params, Connection, OpenFlags, OptionalExtension, Transaction}; use serde_json; @@ -9,10 +13,6 @@ use sha2::{Digest, Sha256}; use sql_support::{open_database::open_database_with_flags, ConnExt}; -use crate::{ - schema::RemoteSettingsConnectionInitializer, Attachment, RemoteSettingsRecord, Result, -}; - /// Internal storage type /// /// This will store downloaded records/attachments in a SQLite database. Nothing is implemented @@ -43,7 +43,7 @@ impl Storage { /// Get the last modified timestamp for the stored records /// /// Returns None if no records are stored or if `collection_url` does not match the - /// last `collection_url` passed to `set_records` / `merge_records` + /// last `collection_url` passed to `insert_collection_content` pub fn get_last_modified_timestamp(&self, collection_url: &str) -> Result> { let mut stmt = self .conn @@ -57,7 +57,7 @@ impl Storage { /// Get cached records for this collection /// /// Returns None if no records are stored or if `collection_url` does not match the `collection_url` passed - /// to `set_records`. + /// to `insert_collection_content`. pub fn get_records( &mut self, collection_url: &str, @@ -85,6 +85,36 @@ impl Storage { result } + /// Get cached metadata for this collection + /// + /// Returns None if no data is stored or if `collection_url` does not match the `collection_url` passed + /// to `insert_collection_content`. + pub fn get_collection_metadata( + &self, + collection_url: &str, + ) -> Result> { + let mut stmt_metadata = self.conn.prepare( + "SELECT bucket, signature, x5u FROM collection_metadata WHERE collection_url = ?", + )?; + + if let Some(metadata) = stmt_metadata + .query_row(params![collection_url], |row| { + Ok(CollectionMetadata { + bucket: row.get(0).unwrap_or_default(), + signature: CollectionSignature { + signature: row.get(1).unwrap_or_default(), + x5u: row.get(2).unwrap_or_default(), + }, + }) + }) + .optional()? + { + Ok(Some(metadata)) + } else { + Ok(None) + } + } + /// Get cached attachment data /// /// This returns the last attachment data sent to [Self::set_attachment]. @@ -120,30 +150,13 @@ impl Storage { } } - /// Set the list of records stored in the database, clearing out any previously stored records - pub fn set_records( - &mut self, - collection_url: &str, - records: &[RemoteSettingsRecord], - ) -> Result<()> { - let tx = self.conn.transaction()?; - - tx.execute("DELETE FROM records", [])?; - tx.execute("DELETE FROM collection_metadata", [])?; - let max_last_modified = Self::update_record_rows(&tx, collection_url, records)?; - Self::update_collection_metadata(&tx, collection_url, max_last_modified)?; - tx.commit()?; - Ok(()) - } - - /// Merge new records with records stored in the database - /// - /// Records with `deleted=false` will be inserted into the DB, replacing any previously stored - /// records with the same ID. Records with `deleted=true` will be removed. - pub fn merge_records( + /// Set cached content for this collection. + pub fn insert_collection_content( &mut self, collection_url: &str, records: &[RemoteSettingsRecord], + last_modified: u64, + metadata: CollectionMetadata, ) -> Result<()> { let tx = self.conn.transaction()?; @@ -159,8 +172,9 @@ impl Storage { "DELETE FROM collection_metadata where collection_url <> ?", [collection_url], )?; - let max_last_modified = Self::update_record_rows(&tx, collection_url, records)?; - Self::update_collection_metadata(&tx, collection_url, max_last_modified)?; + + Self::update_record_rows(&tx, collection_url, records)?; + Self::update_collection_metadata(&tx, collection_url, last_modified, metadata)?; tx.commit()?; Ok(()) } @@ -198,11 +212,20 @@ impl Storage { tx: &Transaction<'_>, collection_url: &str, last_modified: u64, + metadata: CollectionMetadata, ) -> Result<()> { // Update the metadata tx.execute( - "INSERT OR REPLACE INTO collection_metadata (collection_url, last_modified) VALUES (?, ?)", - (collection_url, last_modified), + "INSERT OR REPLACE INTO collection_metadata \ + (collection_url, last_modified, bucket, signature, x5u) \ + VALUES (?, ?, ?, ?, ?)", + ( + collection_url, + last_modified, + metadata.bucket, + metadata.signature.signature, + metadata.signature.x5u, + ), )?; Ok(()) } @@ -250,7 +273,10 @@ impl Storage { #[cfg(test)] mod tests { use super::Storage; - use crate::{Attachment, RemoteSettingsRecord, Result, RsJsonObject}; + use crate::{ + client::CollectionMetadata, client::CollectionSignature, Attachment, RemoteSettingsRecord, + Result, RsJsonObject, + }; use sha2::{Digest, Sha256}; #[test] @@ -282,7 +308,12 @@ mod tests { ]; // Set records - storage.set_records(collection_url, &records)?; + storage.insert_collection_content( + collection_url, + &records, + 300, + CollectionMetadata::default(), + )?; // Get records let fetched_records = storage.get_records(collection_url)?; @@ -295,7 +326,7 @@ mod tests { // Get last modified timestamp let last_modified = storage.get_last_modified_timestamp(collection_url)?; - assert_eq!(last_modified, Some(200)); + assert_eq!(last_modified, Some(300)); Ok(()) } @@ -324,7 +355,12 @@ mod tests { let collection_url = "https://example.com/api"; // Set empty records - storage.set_records(collection_url, &Vec::::default())?; + storage.insert_collection_content( + collection_url, + &Vec::::default(), + 42, + CollectionMetadata::default(), + )?; // Get records let fetched_records = storage.get_records(collection_url)?; @@ -332,7 +368,7 @@ mod tests { // Get last modified timestamp when no records let last_modified = storage.get_last_modified_timestamp(collection_url)?; - assert_eq!(last_modified, Some(0)); + assert_eq!(last_modified, Some(42)); Ok(()) } @@ -519,7 +555,12 @@ mod tests { .expect("No attachment metadata for record"); // Set records and attachment - storage.set_records(collection_url, &records)?; + storage.insert_collection_content( + collection_url, + &records, + 42, + CollectionMetadata::default(), + )?; storage.set_attachment(collection_url, &metadata.location, attachment)?; // Verify they are stored @@ -568,8 +609,12 @@ mod tests { }]; // Set records for collection_url1 - storage.set_records(collection_url1, &records_collection_url1)?; - + storage.insert_collection_content( + collection_url1, + &records_collection_url1, + 42, + CollectionMetadata::default(), + )?; // Verify records for collection_url1 let fetched_records = storage.get_records(collection_url1)?; assert!(fetched_records.is_some()); @@ -578,7 +623,12 @@ mod tests { assert_eq!(fetched_records, records_collection_url1); // Set records for collection_url2, which will clear records for all collections - storage.set_records(collection_url2, &records_collection_url2)?; + storage.insert_collection_content( + collection_url2, + &records_collection_url2, + 300, + CollectionMetadata::default(), + )?; // Verify that records for collection_url1 have been cleared let fetched_records = storage.get_records(collection_url1)?; @@ -595,13 +645,13 @@ mod tests { let last_modified1 = storage.get_last_modified_timestamp(collection_url1)?; assert_eq!(last_modified1, None); let last_modified2 = storage.get_last_modified_timestamp(collection_url2)?; - assert_eq!(last_modified2, Some(200)); + assert_eq!(last_modified2, Some(300)); Ok(()) } #[test] - fn test_storage_set_records() -> Result<()> { + fn test_storage_insert_collection_content() -> Result<()> { let mut storage = Storage::new(":memory:".into())?; let collection_url = "https://example.com/api"; @@ -617,7 +667,12 @@ mod tests { }]; // Set initial records - storage.set_records(collection_url, &initial_records)?; + storage.insert_collection_content( + collection_url, + &initial_records, + 42, + CollectionMetadata::default(), + )?; // Verify initial records let fetched_records = storage.get_records(collection_url)?; @@ -635,7 +690,12 @@ mod tests { .unwrap() .clone(), }]; - storage.set_records(collection_url, &updated_records)?; + storage.insert_collection_content( + collection_url, + &updated_records, + 300, + CollectionMetadata::default(), + )?; // Verify updated records let fetched_records = storage.get_records(collection_url)?; @@ -644,7 +704,7 @@ mod tests { // Verify last modified timestamp let last_modified = storage.get_last_modified_timestamp(collection_url)?; - assert_eq!(last_modified, Some(200)); + assert_eq!(last_modified, Some(300)); Ok(()) } @@ -738,14 +798,24 @@ mod tests { ]; // Set initial records - storage.set_records(collection_url, &initial_records)?; + storage.insert_collection_content( + collection_url, + &initial_records, + 1000, + CollectionMetadata::default(), + )?; // Verify initial records let fetched_records = storage.get_records(collection_url)?.unwrap(); assert_eq!(fetched_records, initial_records); // Update records - storage.merge_records(collection_url, &updated_records)?; + storage.insert_collection_content( + collection_url, + &updated_records, + 1300, + CollectionMetadata::default(), + )?; // Verify updated records let mut fetched_records = storage.get_records(collection_url)?.unwrap(); @@ -755,6 +825,42 @@ mod tests { // Verify last modified timestamp let last_modified = storage.get_last_modified_timestamp(collection_url)?; assert_eq!(last_modified, Some(1300)); + Ok(()) + } + #[test] + fn test_storage_get_collection_metadata() -> Result<()> { + let mut storage = Storage::new(":memory:".into())?; + + let collection_url = "https://example.com/api"; + let initial_records = vec![RemoteSettingsRecord { + id: "2".to_string(), + last_modified: 200, + deleted: false, + attachment: None, + fields: serde_json::json!({"key": "value2"}) + .as_object() + .unwrap() + .clone(), + }]; + + // Set initial records + storage.insert_collection_content( + collection_url, + &initial_records, + 1337, + CollectionMetadata { + bucket: "main".into(), + signature: CollectionSignature { + signature: "b64encodedsig".into(), + x5u: "http://15u/".into(), + }, + }, + )?; + + let metadata = storage.get_collection_metadata(collection_url)?.unwrap(); + + assert_eq!(metadata.signature.signature, "b64encodedsig"); + assert_eq!(metadata.signature.x5u, "http://15u/"); Ok(()) }