Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

read_topic API #288

Merged
merged 32 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
db813ba
Add key_store table and generate
richardhuaaa Oct 24, 2023
f9c6ec8
Implement store/fetch/delete traits
richardhuaaa Oct 24, 2023
257d5b6
Add SQL key store
richardhuaaa Oct 24, 2023
6af1384
Use SQL key store for provider, remove in-memory key store
richardhuaaa Oct 25, 2023
a404c20
Use reference to encrypted store to allow multiple consumers
richardhuaaa Oct 25, 2023
acd42b8
Tidy identity
richardhuaaa Oct 25, 2023
0a53a11
Add identity table, queries, and unit tests
richardhuaaa Oct 26, 2023
4fb1059
Persist and retrieve identity from storage inside builder
richardhuaaa Oct 26, 2023
598b5ea
Tidy up names and types
richardhuaaa Oct 26, 2023
c8780f0
Merge branch 'main' into rich/sqlite-store
neekolas Oct 27, 2023
9dad361
Update Diesel
neekolas Oct 27, 2023
b1f411b
Hard code ciphersuite
neekolas Oct 27, 2023
091afdb
Add tests and fix bugs
richardhuaaa Oct 27, 2023
18c6aa4
Fix lints
richardhuaaa Oct 27, 2023
dae1c74
Update xmtp_mls/src/storage/encrypted_store/mod.rs
richardhuaaa Oct 27, 2023
77aa94b
Dont use the word Error in error names
richardhuaaa Oct 27, 2023
5278d13
Move identity initialization to identity strategy
richardhuaaa Oct 28, 2023
a6c5095
Merge remote-tracking branch 'origin/main' into rich/sqlite-store
richardhuaaa Oct 28, 2023
04f5dd9
Add create key packages methods
neekolas Oct 28, 2023
7a27e56
Merge branch 'main' into create-key-packages
neekolas Oct 28, 2023
7000fb4
Merge branch 'rich/sqlite-store' of github.com:xmtp/libxmtp into rich…
neekolas Oct 28, 2023
2b4b0be
Merge branch 'rich/sqlite-store' into create-key-packages
neekolas Oct 28, 2023
34d42f0
Add verified key package handling
neekolas Oct 29, 2023
249daba
Add comment
neekolas Oct 29, 2023
49f8332
Add read_topic API
neekolas Oct 29, 2023
23fc060
Remove unused lifetime
neekolas Oct 29, 2023
c632b9f
Merge branch 'create-key-packages' into nmolnar/read-topic-api
neekolas Oct 29, 2023
77f1829
Add times check
neekolas Oct 29, 2023
ddd91af
Merge branch 'main' into nmolnar/read-topic-api
neekolas Oct 30, 2023
1a26d8f
Merge branch 'main' into nmolnar/read-topic-api
neekolas Oct 31, 2023
c58ed28
Tidy up cursor extraction
neekolas Oct 31, 2023
292a55d
Avoid unnecessary clones
neekolas Nov 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 182 additions & 10 deletions xmtp_mls/src/api_client_wrapper.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
use std::collections::HashMap;

use xmtp_proto::{
api_client::{Error as ApiError, ErrorKind, XmtpApiClient, XmtpMlsClient},
api_client::{
Envelope, Error as ApiError, ErrorKind, PagingInfo, QueryRequest, XmtpApiClient,
XmtpMlsClient,
},
xmtp::{
message_api::v3::GetIdentityUpdatesRequest,
message_api::{v1::Cursor, v3::GetIdentityUpdatesRequest},
mls::message_contents::{
group_message::{Version as GroupMessageVersion, V1 as GroupMessageV1},
welcome_message::{Version as WelcomeMessageVersion, V1 as WelcomeMessageV1},
WelcomeMessage as WelcomeMessageProto,
},
},
xmtp::{
message_api::v3::{
get_identity_updates_response::update::Kind as UpdateKind,
publish_welcomes_request::WelcomeMessageRequest, ConsumeKeyPackagesRequest,
KeyPackageUpload, PublishToGroupRequest, PublishWelcomesRequest,
RegisterInstallationRequest, UploadKeyPackagesRequest,
message_api::{
v1::SortDirection,
v3::{
get_identity_updates_response::update::Kind as UpdateKind,
publish_welcomes_request::WelcomeMessageRequest, ConsumeKeyPackagesRequest,
KeyPackageUpload, PublishToGroupRequest, PublishWelcomesRequest,
RegisterInstallationRequest, UploadKeyPackagesRequest,
},
},
mls::message_contents::GroupMessage,
},
Expand All @@ -34,6 +40,46 @@ where
Self { api_client }
}

pub async fn read_topic(
&self,
topic: &str,
start_time_ns: u64,
) -> Result<Vec<Envelope>, ApiError> {
let mut cursor: Option<Cursor> = None;
let mut out: Vec<Envelope> = vec![];
let page_size = 100;
loop {
let mut result = self
.api_client
.query(QueryRequest {
content_topics: vec![topic.to_string()],
start_time_ns,
end_time_ns: 0,
paging_info: Some(PagingInfo {
cursor,
limit: page_size,
direction: SortDirection::Ascending as i32,
}),
})
.await?;

let num_envelopes = result.envelopes.len();
out.append(&mut result.envelopes);

if num_envelopes < page_size as usize || result.paging_info.is_none() {
break;
}

cursor = result.paging_info.expect("Empty paging info").cursor;

if cursor.is_none() {
break;
}
}

Ok(out)
}

pub async fn register_installation(
&self,
last_resort_key_package: Vec<u8>,
Expand Down Expand Up @@ -228,7 +274,10 @@ type IdentityUpdatesMap = HashMap<String, Vec<IdentityUpdate>>;
mod tests {
use super::ApiClientWrapper;
use mockall::mock;
use xmtp_proto::api_client::{Error, XmtpApiClient, XmtpApiSubscription, XmtpMlsClient};
use xmtp_proto::api_client::{
Error, PagingInfo, XmtpApiClient, XmtpApiSubscription, XmtpMlsClient,
};
use xmtp_proto::xmtp::message_api::v1::IndexCursor;
use xmtp_proto::xmtp::message_api::v3::consume_key_packages_response::KeyPackage;
use xmtp_proto::xmtp::message_api::v3::get_identity_updates_response::update::Kind as UpdateKind;
use xmtp_proto::xmtp::message_api::v3::get_identity_updates_response::{
Expand All @@ -241,12 +290,24 @@ mod tests {
};

use xmtp_proto::xmtp::message_api::v1::{
BatchQueryRequest, BatchQueryResponse, Envelope, PublishRequest, PublishResponse,
QueryRequest, QueryResponse, SubscribeRequest,
cursor::Cursor as InnerCursor, BatchQueryRequest, BatchQueryResponse, Cursor, Envelope,
PublishRequest, PublishResponse, QueryRequest, QueryResponse, SubscribeRequest,
};

use async_trait::async_trait;

fn build_envelopes(num_envelopes: usize, topic: &str) -> Vec<Envelope> {
let mut out: Vec<Envelope> = vec![];
for i in 0..num_envelopes {
out.push(Envelope {
content_topic: topic.to_string(),
message: vec![i as u8],
timestamp_ns: i as u64,
})
}
out
}

mock! {
pub Subscription {}

Expand Down Expand Up @@ -442,4 +503,115 @@ mod tests {
}
}
}

#[tokio::test]
async fn test_read_topic_single_page() {
let mut mock_api = MockApiClient::new();
let topic = "topic";
let start_time_ns = 10;
// Set expectation for first request with no cursor
mock_api.expect_query().returning(move |req| {
assert_eq!(req.content_topics[0], topic);

Ok(QueryResponse {
paging_info: Some(PagingInfo {
cursor: None,
limit: 100,
direction: 0,
}),
envelopes: build_envelopes(10, topic),
})
});

let wrapper = ApiClientWrapper::new(mock_api);

let result = wrapper.read_topic(topic, start_time_ns).await.unwrap();
assert_eq!(result.len(), 10);
}

#[tokio::test]
async fn test_read_topic_single_page_exactly_100_results() {
let mut mock_api = MockApiClient::new();
let topic = "topic";
let start_time_ns = 10;
// Set expectation for first request with no cursor
mock_api.expect_query().times(1).returning(move |req| {
assert_eq!(req.content_topics[0], topic);

Ok(QueryResponse {
paging_info: Some(PagingInfo {
cursor: None,
limit: 100,
direction: 0,
}),
envelopes: build_envelopes(100, topic),
})
});

let wrapper = ApiClientWrapper::new(mock_api);

let result = wrapper.read_topic(topic, start_time_ns).await.unwrap();
assert_eq!(result.len(), 100);
}

#[tokio::test]
async fn test_read_topic_multi_page() {
let mut mock_api = MockApiClient::new();
let topic = "topic";
let start_time_ns = 10;
// Set expectation for first request with no cursor
mock_api
.expect_query()
.withf(move |req| match req.paging_info.clone() {
Some(paging_info) => match paging_info.cursor {
Some(_) => false,
None => true,
},
None => true,
} && req.start_time_ns == 10)
.returning(move |req| {
assert_eq!(req.content_topics[0], topic);

Ok(QueryResponse {
paging_info: Some(PagingInfo {
cursor: Some(Cursor {
cursor: Some(InnerCursor::Index(IndexCursor {
digest: vec![],
sender_time_ns: 0,
})),
}),
limit: 100,
direction: 0,
}),
envelopes: build_envelopes(100, topic),
})
});
// Set expectation for requests with a cursor
mock_api
.expect_query()
.withf(|req| match req.paging_info.clone() {
Some(paging_info) => match paging_info.cursor {
Some(_) => true,
None => false,
},
None => false,
})
.returning(move |req| {
assert_eq!(req.content_topics[0], topic);

Ok(QueryResponse {
paging_info: Some(PagingInfo {
cursor: None,
limit: 100,
direction: 0,
}),
envelopes: build_envelopes(100, topic),
})
});

let wrapper = ApiClientWrapper::new(mock_api);

let result = wrapper.read_topic(topic, start_time_ns).await.unwrap();
assert_eq!(result.len(), 200);
}
}
2 changes: 1 addition & 1 deletion xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ mod tests {
let wallet = generate_local_wallet();
let wallet_address = wallet.get_address();
let client = ClientBuilder::new_test_client(wallet.clone().into()).await;

client.register_identity().await.unwrap();
client.top_up_key_packages().await.unwrap();

Expand Down
Loading