Skip to content

Commit

Permalink
Remote Settings: store full changesets instead of just records
Browse files Browse the repository at this point in the history
  • Loading branch information
leplatrem committed Jan 7, 2025
1 parent 7d70184 commit d305778
Show file tree
Hide file tree
Showing 3 changed files with 281 additions and 84 deletions.
150 changes: 116 additions & 34 deletions components/remote_settings/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,15 @@ impl<C: ApiClient> RemoteSettingsClient<C> {
.get_last_modified_timestamp(&collection_url)?
.unwrap_or(0);
if packaged_data.timestamp > cached_timestamp {
inner
.storage
.set_records(&collection_url, &packaged_data.data)?;
// Remove previously cached data (packaged data does not have tombstones like diff responses do).
inner.storage.empty()?;
// Insert new packaged data.
inner.storage.insert_collection_content(
&collection_url,
&packaged_data.data,
packaged_data.timestamp,
CollectionMetadata::default(),
)?;
return Ok(Some(self.filter_records(packaged_data.data)));
}
}
Expand All @@ -168,9 +174,14 @@ impl<C: ApiClient> RemoteSettingsClient<C> {
(Some(cached_records), _) => Some(self.filter_records(cached_records)),
// Case 3: sync_if_empty=true
(None, true) => {
let records = inner.api_client.get_records(None)?;
inner.storage.set_records(&collection_url, &records)?;
Some(self.filter_records(records))
let changeset = inner.api_client.fetch_changeset(None)?;
inner.storage.insert_collection_content(
&collection_url,
&changeset.changes,
changeset.timestamp,
changeset.metadata,
)?;
Some(self.filter_records(changeset.changes))
}
// Case 4: Nothing to return
(None, false) => None,
Expand All @@ -181,8 +192,13 @@ impl<C: ApiClient> RemoteSettingsClient<C> {
let mut inner = self.inner.lock();
let collection_url = inner.api_client.collection_url();
let mtime = inner.storage.get_last_modified_timestamp(&collection_url)?;
let records = inner.api_client.get_records(mtime)?;
inner.storage.merge_records(&collection_url, &records)
let changeset = inner.api_client.fetch_changeset(mtime)?;
inner.storage.insert_collection_content(
&collection_url,
&changeset.changes,
changeset.timestamp,
changeset.metadata,
)
}

/// Downloads an attachment from [attachment_location]. NOTE: there are no guarantees about a
Expand Down Expand Up @@ -221,7 +237,7 @@ impl<C: ApiClient> RemoteSettingsClient<C> {
}

// Try to download the attachment because neither the storage nor the local data had it
let attachment = inner.api_client.get_attachment(&metadata.location)?;
let attachment = inner.api_client.fetch_attachment(&metadata.location)?;

// Verify downloaded data
if attachment.len() as u64 != metadata.size {
Expand Down Expand Up @@ -284,10 +300,10 @@ pub trait ApiClient {
fn collection_url(&self) -> String;

/// Fetch records from the server
fn get_records(&mut self, timestamp: Option<u64>) -> Result<Vec<RemoteSettingsRecord>>;
fn fetch_changeset(&mut self, timestamp: Option<u64>) -> Result<ChangesetResponse>;

/// Fetch an attachment from the server
fn get_attachment(&mut self, attachment_location: &str) -> Result<Vec<u8>>;
fn fetch_attachment(&mut self, attachment_location: &str) -> Result<Vec<u8>>;

/// Check if this client is pointing to the production server
fn is_prod_server(&self) -> Result<bool>;
Expand Down Expand Up @@ -372,7 +388,7 @@ impl ApiClient for ViaductApiClient {
self.endpoints.collection_url.to_string()
}

fn get_records(&mut self, timestamp: Option<u64>) -> Result<Vec<RemoteSettingsRecord>> {
fn fetch_changeset(&mut self, timestamp: Option<u64>) -> Result<ChangesetResponse> {
let mut url = self.endpoints.changeset_url.clone();
// 0 is used as an arbitrary value for `_expected` because the current implementation does
// not leverage push timestamps or polling from the monitor/changes endpoint. More
Expand All @@ -388,7 +404,7 @@ impl ApiClient for ViaductApiClient {
let resp = self.make_request(url)?;

if resp.is_success() {
Ok(resp.json::<ChangesetResponse>()?.changes)
Ok(resp.json::<ChangesetResponse>()?)
} else {
Err(Error::ResponseError(format!(
"status code: {}",
Expand All @@ -397,7 +413,7 @@ impl ApiClient for ViaductApiClient {
}
}

fn get_attachment(&mut self, attachment_location: &str) -> Result<Vec<u8>> {
fn fetch_attachment(&mut self, attachment_location: &str) -> Result<Vec<u8>> {
let attachments_base_url = match &self.remote_state.attachments_base_url {
Some(attachments_base_url) => attachments_base_url.to_owned(),
None => {
Expand Down Expand Up @@ -706,9 +722,24 @@ struct RecordsResponse {
data: Vec<RemoteSettingsRecord>,
}

#[derive(Deserialize, Serialize)]
struct ChangesetResponse {
#[derive(Clone, Deserialize, Serialize)]
pub struct ChangesetResponse {
changes: Vec<RemoteSettingsRecord>,
timestamp: u64,
metadata: CollectionMetadata,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
pub struct CollectionMetadata {
pub bucket: String,
pub signature: CollectionSignature,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
pub struct CollectionSignature {
pub signature: String,
/// X.509 certificate chain Url (x5u)
pub x5u: String,
}

/// A parsed Remote Settings record. Records can contain arbitrary fields, so clients
Expand All @@ -717,6 +748,7 @@ struct ChangesetResponse {
pub struct RemoteSettingsRecord {
pub id: String,
pub last_modified: u64,
/// Tombstone flag (see https://remote-settings.readthedocs.io/en/latest/client-specifications.html#local-state)
#[serde(default)]
pub deleted: bool,
pub attachment: Option<Attachment>,
Expand Down Expand Up @@ -1705,14 +1737,24 @@ mod test_new_client {
attachment: None,
fields: json!({"foo": "bar"}).as_object().unwrap().clone(),
}];
let changeset = ChangesetResponse {
changes: records.clone(),
timestamp: 42,
metadata: CollectionMetadata {
bucket: "main".into(),
signature: CollectionSignature {
signature: "b64sig".into(),
x5u: "http://x5u.com".into(),
},
},
};
api_client.expect_collection_url().returning(|| {
"http://rs.example.com/v1/buckets/main/collections/test-collection".into()
});
api_client.expect_get_records().returning({
let records = records.clone();
api_client.expect_fetch_changeset().returning({
move |timestamp| {
assert_eq!(timestamp, None);
Ok(records.clone())
Ok(changeset.clone())
}
});
api_client.expect_is_prod_server().returning(|| Ok(false));
Expand Down Expand Up @@ -1748,14 +1790,19 @@ mod jexl_tests {
.unwrap()
.clone(),
}];
let changeset = ChangesetResponse {
changes: records.clone(),
timestamp: 42,
metadata: CollectionMetadata::default(),
};
api_client.expect_collection_url().returning(|| {
"http://rs.example.com/v1/buckets/main/collections/test-collection".into()
});
api_client.expect_get_records().returning({
let records = records.clone();
api_client.expect_fetch_changeset().returning({
let changeset = changeset.clone();
move |timestamp| {
assert_eq!(timestamp, None);
Ok(records.clone())
Ok(changeset.clone())
}
});
api_client.expect_is_prod_server().returning(|| Ok(false));
Expand All @@ -1766,9 +1813,11 @@ mod jexl_tests {
};

let mut storage = Storage::new(":memory:".into()).expect("Error creating storage");
let _ = storage.set_records(
let _ = storage.insert_collection_content(
"http://rs.example.com/v1/buckets/main/collections/test-collection",
&records,
42,
CollectionMetadata::default(),
);

let rs_client = RemoteSettingsClient::new_from_parts(
Expand Down Expand Up @@ -1799,14 +1848,19 @@ mod jexl_tests {
.unwrap()
.clone(),
}];
let changeset = ChangesetResponse {
changes: records.clone(),
timestamp: 42,
metadata: CollectionMetadata::default(),
};
api_client.expect_collection_url().returning(|| {
"http://rs.example.com/v1/buckets/main/collections/test-collection".into()
});
api_client.expect_get_records().returning({
let records = records.clone();
api_client.expect_fetch_changeset().returning({
let changeset = changeset.clone();
move |timestamp| {
assert_eq!(timestamp, None);
Ok(records.clone())
Ok(changeset.clone())
}
});
api_client.expect_is_prod_server().returning(|| Ok(false));
Expand All @@ -1817,9 +1871,11 @@ mod jexl_tests {
};

let mut storage = Storage::new(":memory:".into()).expect("Error creating storage");
let _ = storage.set_records(
let _ = storage.insert_collection_content(
"http://rs.example.com/v1/buckets/main/collections/test-collection",
&records,
42,
CollectionMetadata::default(),
);

let rs_client = RemoteSettingsClient::new_from_parts(
Expand Down Expand Up @@ -1902,7 +1958,12 @@ mod cached_data_tests {

let mut api_client = MockApiClient::new();
let mut storage = Storage::new(":memory:".into())?;
storage.set_records(collection_url, &vec![old_record.clone()])?;
storage.insert_collection_content(
collection_url,
&vec![old_record.clone()],
42,
CollectionMetadata::default(),
)?;

api_client
.expect_collection_url()
Expand Down Expand Up @@ -1962,10 +2023,21 @@ mod cached_data_tests {
attachment: None,
fields: serde_json::Map::new(),
}];
let changeset = ChangesetResponse {
changes: expected_records.clone(),
timestamp: 42,
metadata: CollectionMetadata {
bucket: "main".into(),
signature: CollectionSignature {
signature: "b64sig".into(),
x5u: "http://x5u.com".into(),
},
},
};
api_client
.expect_get_records()
.expect_fetch_changeset()
.withf(|timestamp| timestamp.is_none())
.returning(move |_| Ok(expected_records.clone()));
.returning(move |_| Ok(changeset.clone()));

let rs_client =
RemoteSettingsClient::new_from_parts(collection_name.to_string(), storage, api_client);
Expand Down Expand Up @@ -2012,7 +2084,7 @@ mod cached_data_tests {
api_client.expect_is_prod_server().returning(|| Ok(true));

// Since sync_if_empty is false, get_records should not be called
// No need to set expectation for api_client.get_records
// No need to set expectation for api_client.fetch_changeset

let rs_client =
RemoteSettingsClient::new_from_parts(collection_name.to_string(), storage, api_client);
Expand Down Expand Up @@ -2046,7 +2118,12 @@ mod cached_data_tests {
attachment: None,
fields: serde_json::Map::new(),
}];
storage.set_records(&collection_url, &cached_records)?;
storage.insert_collection_content(
&collection_url,
&cached_records,
42,
CollectionMetadata::default(),
)?;

api_client
.expect_collection_url()
Expand Down Expand Up @@ -2082,7 +2159,12 @@ mod cached_data_tests {

// Set up empty cached records
let cached_records: Vec<RemoteSettingsRecord> = vec![];
storage.set_records(&collection_url, &cached_records)?;
storage.insert_collection_content(
&collection_url,
&cached_records,
42,
CollectionMetadata::default(),
)?;

api_client
.expect_collection_url()
Expand Down Expand Up @@ -2210,7 +2292,7 @@ mod test_packaged_metadata {
.returning(move || collection_url.clone());
api_client.expect_is_prod_server().returning(|| Ok(true));
api_client
.expect_get_attachment()
.expect_fetch_attachment()
.returning(move |_| Ok(mock_api_data.clone()));

let rs_client =
Expand Down
15 changes: 12 additions & 3 deletions components/remote_settings/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use sql_support::open_database::{self, ConnectionInitializer};
/// 1. Bump this version.
/// 2. Add a migration from the old version to the new version in
/// [`RemoteSettingsConnectionInitializer::upgrade_from`].
pub const VERSION: u32 = 1;
pub const VERSION: u32 = 2;

/// The current remote settings database schema.
pub const SQL: &str = r#"
Expand All @@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS attachments (
data BLOB NOT NULL);
CREATE TABLE IF NOT EXISTS collection_metadata (
collection_url TEXT PRIMARY KEY,
last_modified INTEGER);
last_modified INTEGER, bucket TEXT, signature TEXT, x5u TEXT);
"#;

/// Initializes an SQLite connection to the Remote Settings database, performing
Expand All @@ -37,7 +37,7 @@ pub struct RemoteSettingsConnectionInitializer;

impl ConnectionInitializer for RemoteSettingsConnectionInitializer {
const NAME: &'static str = "remote_settings";
const END_VERSION: u32 = 1;
const END_VERSION: u32 = 2;

fn prepare(&self, conn: &Connection, _db_empty: bool) -> open_database::Result<()> {
let initial_pragmas = "
Expand All @@ -63,6 +63,15 @@ impl ConnectionInitializer for RemoteSettingsConnectionInitializer {
tx.execute("ALTER TABLE collection_metadata DROP column fetched", ())?;
Ok(())
}
1 => {
tx.execute("ALTER TABLE collection_metadata ADD COLUMN bucket TEXT", ())?;
tx.execute(
"ALTER TABLE collection_metadata ADD COLUMN signature TEXT",
(),
)?;
tx.execute("ALTER TABLE collection_metadata ADD COLUMN x5u TEXT", ())?;
Ok(())
}
_ => Err(open_database::Error::IncompatibleVersion(version)),
}
}
Expand Down
Loading

0 comments on commit d305778

Please sign in to comment.