diff --git a/ipa-core/src/config.rs b/ipa-core/src/config.rs index 50bd90f4b..30d1b70a8 100644 --- a/ipa-core/src/config.rs +++ b/ipa-core/src/config.rs @@ -532,8 +532,11 @@ mod tests { }; const URI_1: &str = "http://localhost:3000"; + const URI_1S: &str = "http://localhost:6000"; const URI_2: &str = "http://localhost:3001"; + const URI_2S: &str = "http://localhost:6001"; const URI_3: &str = "http://localhost:3002"; + const URI_3S: &str = "http://localhost:6002"; #[test] fn parse_config() { @@ -541,18 +544,27 @@ mod tests { let uri1 = URI_1.parse::().unwrap(); let id1 = HelperIdentity::try_from(1usize).unwrap(); - let value1 = &conf.network.peers()[id1]; - assert_eq!(value1.url, uri1); + let ring_value1 = &conf.leaders_ring().network.peers()[id1]; + assert_eq!(ring_value1.url, uri1); + let uri1s = URI_1S.parse::().unwrap(); + let sharding_value1 = conf.get_shards_for_helper(id1).network.get_peer(0).unwrap(); + assert_eq!(sharding_value1.url, uri1s); let uri2 = URI_2.parse::().unwrap(); let id2 = HelperIdentity::try_from(2usize).unwrap(); - let value2 = &conf.network.peers()[id2]; - assert_eq!(value2.url, uri2); + let ring_value2 = &conf.leaders_ring().network.peers()[id2]; + assert_eq!(ring_value2.url, uri2); + let uri2s = URI_2S.parse::().unwrap(); + let sharding_value2 = conf.get_shards_for_helper(id2).network.get_peer(0).unwrap(); + assert_eq!(sharding_value2.url, uri2s); let uri3 = URI_3.parse::().unwrap(); let id3 = HelperIdentity::try_from(3usize).unwrap(); - let value3 = &conf.network.peers()[id3]; - assert_eq!(value3.url, uri3); + let ring_value3 = &conf.leaders_ring().network.peers()[id3]; + assert_eq!(ring_value3.url, uri3); + let uri3s = URI_3S.parse::().unwrap(); + let sharding_value3 = conf.get_shards_for_helper(id3).network.get_peer(0).unwrap(); + assert_eq!(sharding_value3.url, uri3s); } #[test] diff --git a/ipa-core/src/net/client/mod.rs b/ipa-core/src/net/client/mod.rs index 3693431f2..0b47fb03e 100644 --- a/ipa-core/src/net/client/mod.rs +++ b/ipa-core/src/net/client/mod.rs @@ -25,7 +25,7 @@ use pin_project::pin_project; use rustls::RootCertStore; use tracing::error; -use super::{ConnectionFlavor, Helper}; +use super::{ConnectionFlavor, Helper, Shard}; use crate::{ config::{ ClientConfig, HyperClientConfigurator, NetworkConfig, OwnedCertificate, OwnedPrivateKey, @@ -469,6 +469,33 @@ impl MpcHelperClient { } } +impl MpcHelperClient { + /// This is a mirror of [`MpcHelperClient::from_config`] but for Shards. This creates + /// set of Shard clients in the supplied helper network configuration, which can be used to + /// talk to each of the shards in this helper. + /// + /// `identity` configures whether and how the client will authenticate to the server. It is for + /// the shard making the calls, so the same one is used for all three of the clients. + #[must_use] + #[allow(clippy::missing_panics_doc)] + pub fn shards_from_conf( + runtime: &IpaRuntime, + conf: &NetworkConfig, + identity: &ClientIdentity, + ) -> Vec { + conf.peers_iter() + .map(|peer_conf| { + Self::new( + runtime.clone(), + &conf.client, + peer_conf.clone(), + identity.clone_with_key(), + ) + }) + .collect() + } +} + fn make_http_connector() -> HttpConnector { let mut connector = HttpConnector::new(); // IPA uses HTTP2 and it is sensitive to those delays especially in high-latency network diff --git a/ipa-core/src/net/mod.rs b/ipa-core/src/net/mod.rs index e0fdca35a..c71c609ea 100644 --- a/ipa-core/src/net/mod.rs +++ b/ipa-core/src/net/mod.rs @@ -115,15 +115,15 @@ pub fn parse_certificate_and_private_key_bytes( mod tests { use std::io::ErrorKind; - use crate::net::test; + use super::test::get_test_certificate_and_key; + use crate::sharding::ShardedHelperIdentity; const NOTHING: &[u8] = b" "; const GARBAGE: &[u8] = b"ksjdhfskjdfhsdf"; #[test] fn parse_cert_pk_happy_path() { - let mut c = test::TEST_CERTS[0]; - let mut pk = test::TEST_KEYS[0]; + let (mut c, mut pk) = get_test_certificate_and_key(ShardedHelperIdentity::ONE_FIRST); super::parse_certificate_and_private_key_bytes(&mut c, &mut pk).unwrap(); } @@ -131,7 +131,7 @@ mod tests { #[should_panic(expected = "No certificates found")] fn parse_cert_pk_no_cert() { let mut c = NOTHING; - let mut pk = test::TEST_KEYS[0]; + let (_, mut pk) = get_test_certificate_and_key(ShardedHelperIdentity::ONE_FIRST); let r = super::parse_certificate_and_private_key_bytes(&mut c, &mut pk); assert_eq!(r.as_ref().unwrap_err().kind(), ErrorKind::Other); r.unwrap(); @@ -140,7 +140,7 @@ mod tests { #[test] #[should_panic(expected = "No private key")] fn parse_cert_pk_no_pk() { - let mut c = test::TEST_CERTS[0]; + let (mut c, _) = get_test_certificate_and_key(ShardedHelperIdentity::ONE_FIRST); let mut pk = NOTHING; let r = super::parse_certificate_and_private_key_bytes(&mut c, &mut pk); assert_eq!(r.as_ref().unwrap_err().kind(), ErrorKind::Other); diff --git a/ipa-core/src/net/test.rs b/ipa-core/src/net/test.rs index e62bccce6..e39c0c109 100644 --- a/ipa-core/src/net/test.rs +++ b/ipa-core/src/net/test.rs @@ -9,46 +9,172 @@ #![allow(clippy::missing_panics_doc)] use std::{ - array, + collections::HashSet, net::{SocketAddr, TcpListener}, + ops::Index, }; use once_cell::sync::Lazy; use rustls_pki_types::CertificateDer; -use super::transport::MpcHttpTransport; +use super::{transport::MpcHttpTransport, ConnectionFlavor, Shard}; use crate::{ config::{ ClientConfig, HpkeClientConfig, HpkeServerConfig, NetworkConfig, PeerConfig, ServerConfig, TlsConfig, }, executor::{IpaJoinHandle, IpaRuntime}, - helpers::{HandlerBox, HelperIdentity, RequestHandler}, + helpers::{HandlerBox, HelperIdentity, RequestHandler, TransportIdentity}, hpke::{Deserializable as _, IpaPublicKey}, net::{ClientIdentity, Helper, MpcHelperClient, MpcHelperServer}, + sharding::{ShardIndex, ShardedHelperIdentity}, sync::Arc, test_fixture::metrics::MetricsHandle, }; -pub const DEFAULT_TEST_PORTS: [u16; 3] = [3000, 3001, 3002]; +/// Simple struct to keep default port configuration organized. +#[derive(Clone)] +pub struct Ports { + ring: [u16; 3], + shards: [u16; 3], +} -pub struct TestConfig { - pub disable_https: bool, - pub network: NetworkConfig, - pub servers: [ServerConfig; 3], - pub sockets: Option<[TcpListener; 3]>, +/// A **single** ring with 3 hosts, each with a ring and sharding port. +pub const DEFAULT_TEST_PORTS: Ports = Ports { + ring: [3000, 3001, 3002], + shards: [6000, 6001, 6002], +}; + +/// Configuration of a server that can be reached via socket or port. +pub struct AddressableTestServer { + /// The identity of this server in the network. + pub id: ShardedHelperIdentity, + /// Contains the ports + pub config: ServerConfig, + /// Sockets are created if no port was specified. + pub socket: Option, } -impl TestConfig { +/// Creates a new socket from the OS if no port is given. +fn create_port(optional_port: Option) -> (Option, u16) { + if let Some(port) = optional_port { + (None, port) + } else { + let socket = TcpListener::bind("localhost:0").unwrap(); + let port = socket.local_addr().unwrap().port(); + (Some(socket), port) + } +} + +impl AddressableTestServer { + /// Creates a new Test Server with the given Id. If no port is given, one will be obtained from + /// the OS. + fn new( + id: ShardedHelperIdentity, + optional_port: Option, + conf: &TestConfigBuilder, + ) -> Self { + let (socket, port) = create_port(optional_port); + let config = if conf.disable_https { + server_config_insecure_http(port, !conf.disable_matchkey_encryption) + } else { + server_config_https(id, port, !conf.disable_matchkey_encryption) + }; + Self { id, config, socket } + } +} + +/// Either a single Ring on MPC connection or all of the shards in a Helper. +pub struct TestNetwork { + pub network: NetworkConfig, // Contains Clients config + pub servers: Vec, +} + +impl TestNetwork { + /// Helper function that creates [`PeerConfig`] + fn create_peers( + servers: &[AddressableTestServer], + conf: &TestConfigBuilder, + ) -> Vec { + servers + .iter() + .map(|addr_server| { + let port = addr_server + .config + .port + .expect("Port should have been defined already"); + let (scheme, certificate) = if conf.disable_https { + ("http", None) + } else { + ("https", Some(TEST_CERTS_DER[addr_server.id].clone())) + }; + let url = format!("{scheme}://localhost:{port}").parse().unwrap(); + let hpke_config = if conf.disable_matchkey_encryption { + None + } else { + Some(HpkeClientConfig::new( + IpaPublicKey::from_bytes( + &hex::decode(TEST_HPKE_PUBLIC_KEY.trim()).unwrap(), + ) + .unwrap(), + )) + }; + PeerConfig { + url, + certificate, + hpke_config, + } + }) + .collect() + } +} + +impl TestNetwork { #[must_use] - pub fn builder() -> TestConfigBuilder { - TestConfigBuilder::default() + /// Gets a ref to the first shard in this network. + pub fn get_first_shard(&self) -> &AddressableTestServer { + self.servers.first().unwrap() + } + + /// Gets a mut ref to the first shard in this network. + pub fn get_first_shard_mut(&mut self) -> &mut AddressableTestServer { + self.servers.get_mut(0).unwrap() } } -impl Default for TestConfig { - fn default() -> Self { - Self::builder().build() +impl TestNetwork { + /// Creates 3 mpc test servers and creates a network. + fn new_mpc(ix: ShardIndex, ports: Vec>, conf: &TestConfigBuilder) -> Self { + let servers: Vec<_> = HelperIdentity::make_three() + .into_iter() + .zip(ports) + .map(|(id, p)| { + let sid = ShardedHelperIdentity::new(id, ix); + AddressableTestServer::new(sid, p, conf) + }) + .collect(); + let peers = Self::create_peers(servers.as_slice(), conf); + let client_config = conf.create_client_config(); + let network = NetworkConfig::::new_mpc(peers, client_config); + TestNetwork { network, servers } + } +} + +impl TestNetwork { + /// Creates all the shards for a helper and creates a network. + fn new_shards(id: HelperIdentity, ports: Vec>, conf: &TestConfigBuilder) -> Self { + let servers: Vec<_> = (0..conf.shard_count) + .map(ShardIndex) + .zip(ports) + .map(|(ix, p)| { + let sid = ShardedHelperIdentity::new(id, ix); + AddressableTestServer::new(sid, p, conf) + }) + .collect(); + let peers = Self::create_peers(servers.as_slice(), conf); + let client_config = conf.create_client_config(); + let network = NetworkConfig::::new_shards(peers, client_config); + TestNetwork { network, servers } } } @@ -74,8 +200,8 @@ fn server_config_insecure_http(port: u16, matchkey_encryption: bool) -> ServerCo } #[must_use] -pub fn server_config_https( - id: HelperIdentity, +fn server_config_https( + id: ShardedHelperIdentity, port: u16, matchkey_encryption: bool, ) -> ServerConfig { @@ -91,30 +217,102 @@ pub fn server_config_https( } } -#[derive(Default)] +/// Uber container for test configuration. Provides access to a vec of MPC rings and 3 sharding +/// networks (one for each Helper) +pub struct TestConfig { + pub disable_https: bool, + pub rings: Vec>, + pub shards: Vec>, +} + +impl TestConfig { + /// Gets a ref to the first ring in the network. This ring is important because it's the one + /// that's reached out by the report collector on behalf of all the shards in the helper. + #[must_use] + pub fn leaders_ring(&self) -> &TestNetwork { + &self.rings[0] + } + + /// Gets a ref to the entire shard network for a specific helper. + #[must_use] + pub fn get_shards_for_helper(&self, id: HelperIdentity) -> &TestNetwork { + self.shards.get(id.as_index()).unwrap() + } + + /// Gets a mut ref to the entire shard network for a specific helper. + pub fn get_shards_for_helper_mut(&mut self, id: HelperIdentity) -> &mut TestNetwork { + self.shards.get_mut(id.as_index()).unwrap() + } + + /// Creates a new [`TestConfig`] using the provided configuration. + fn new(conf: &TestConfigBuilder) -> Self { + let rings = (0..conf.shard_count) + .map(ShardIndex) + .map(|s| { + let ports = conf.get_ports_for_shard_index(s); + TestNetwork::::new_mpc(s, ports, conf) + }) + .collect(); + let shards = HelperIdentity::make_three() + .into_iter() + .map(|id| { + let ports = conf.get_ports_for_helper_identity(id); + TestNetwork::::new_shards(id, ports, conf) + }) + .collect(); + Self { + disable_https: conf.disable_https, + rings, + shards, + } + } +} + +impl TestConfig { + #[must_use] + pub fn builder() -> TestConfigBuilder { + TestConfigBuilder::default() + } +} + +impl Default for TestConfig { + fn default() -> Self { + Self::builder().build() + } +} + pub struct TestConfigBuilder { - ports: Option<[u16; 3]>, + /// Can be None, meaning that free ports should be obtained from the operating system. + /// One ring per shard in a helper (see [`shard_count`]). For each ring we need 3 shard + /// (A `Vec`) and 3 mpc ports. + ports_by_ring: Option>, + /// Describes the number of shards per helper. This is directly related to [`ports_by_ring`]. + shard_count: u32, disable_https: bool, use_http1: bool, disable_matchkey_encryption: bool, } -impl TestConfigBuilder { - #[must_use] - pub fn with_http_and_default_test_ports() -> Self { +impl Default for TestConfigBuilder { + /// Non-sharded, HTTPS and get ports from OS. + fn default() -> Self { Self { - ports: Some(DEFAULT_TEST_PORTS), - disable_https: true, + ports_by_ring: None, + shard_count: 1, + disable_https: false, use_http1: false, disable_matchkey_encryption: false, } } +} +impl TestConfigBuilder { #[must_use] - pub fn with_open_ports() -> Self { + pub fn with_http_and_default_test_ports() -> Self { Self { - ports: None, - disable_https: false, + ports_by_ring: Some(vec![DEFAULT_TEST_PORTS]), + shard_count: 1, + disable_https: true, use_http1: false, disable_matchkey_encryption: false, } @@ -126,6 +324,22 @@ impl TestConfigBuilder { self } + /// Sets the ports the test network should use. + /// # Panics + /// If a duplicate port is given. + #[must_use] + pub fn with_ports_by_ring(mut self, value: Vec) -> Self { + self.shard_count = value.len().try_into().unwrap(); + let mut uniqueness_set = HashSet::new(); + for ps in &value { + for p in ps.ring.iter().chain(ps.shards.iter()) { + assert!(uniqueness_set.insert(p), "Found duplicate port {p}"); + } + } + self.ports_by_ring = Some(value); + self + } + #[must_use] pub fn with_use_http1_option(mut self, value: bool) -> Self { self.use_http1 = value; @@ -140,63 +354,41 @@ impl TestConfigBuilder { self } - #[must_use] - pub fn build(self) -> TestConfig { - let mut sockets = None; - let ports = self.ports.unwrap_or_else(|| { - let socks = array::from_fn(|_| TcpListener::bind("localhost:0").unwrap()); - let ports = socks - .each_ref() - .map(|sock| sock.local_addr().unwrap().port()); - sockets = Some(socks); - ports - }); - let (scheme, certs) = if self.disable_https { - ("http", [None, None, None]) + /// Creates a HTTP1 or HTTP2 client config. + pub fn create_client_config(&self) -> ClientConfig { + self.use_http1 + .then(ClientConfig::use_http1) + .unwrap_or_default() + } + + /// Get all the MPC ports in a ring specified by the shard index. + fn get_ports_for_shard_index(&self, ix: ShardIndex) -> Vec> { + if let Some(ports_by_ring) = &self.ports_by_ring { + let ports = ports_by_ring[ix.as_index()].clone(); + ports.ring.into_iter().map(Some).collect() } else { - ("https", TEST_CERTS_DER.clone().map(Some)) - }; - let peers = certs - .into_iter() - .enumerate() - .map(|(i, cert)| PeerConfig { - url: format!("{scheme}://localhost:{}", ports[i]) - .parse() - .unwrap(), - certificate: cert, - hpke_config: if self.disable_matchkey_encryption { - None - } else { - Some(HpkeClientConfig::new( - IpaPublicKey::from_bytes( - &hex::decode(TEST_HPKE_PUBLIC_KEY.trim()).unwrap(), - ) - .unwrap(), - )) - }, - }) - .collect::>(); - let network = NetworkConfig::::new_mpc( - peers, - self.use_http1 - .then(ClientConfig::use_http1) - .unwrap_or_default(), - ); - let servers = if self.disable_https { - ports.map(|ports| server_config_insecure_http(ports, !self.disable_matchkey_encryption)) + vec![None; 3] + } + } + + /// Get all the shard ports in a helper. + fn get_ports_for_helper_identity(&self, id: HelperIdentity) -> Vec> { + if let Some(ports_by_ring) = &self.ports_by_ring { + ports_by_ring + .iter() + .map(|r| Some(r.shards[id.as_index()])) + .collect() } else { - HelperIdentity::make_three() - .map(|id| server_config_https(id, ports[id], !self.disable_matchkey_encryption)) - }; - TestConfig { - network, - servers, - sockets, - disable_https: self.disable_https, + vec![None; self.shard_count.try_into().unwrap()] } } -} + /// Creates a test network with shards. + #[must_use] + pub fn build(&self) -> TestConfig { + TestConfig::new(self) + } +} pub struct TestServer { pub addr: SocketAddr, pub handle: IpaJoinHandle<()>, @@ -270,42 +462,32 @@ impl TestServerBuilder { } pub async fn build(self) -> TestServer { - let identity = if self.disable_https { - ClientIdentity::Header(HelperIdentity::ONE) - } else { - get_test_identity(HelperIdentity::ONE) - }; - let test_config = TestConfig::builder() + let identities = + ClientIdentities::new(self.disable_https, ShardedHelperIdentity::ONE_FIRST); + let mut test_config = TestConfig::builder() .with_disable_https_option(self.disable_https) .with_use_http1_option(self.use_http1) // TODO: add disble_matchkey here .build(); - let TestConfig { - network: network_config, - servers: [server_config, _, _], - sockets: Some([server_socket, _, _]), - .. - } = test_config - else { - panic!("TestConfig should have allocated ports"); - }; + let leaders_ring = test_config.rings.pop().unwrap(); + let first_server = leaders_ring.servers.into_iter().next().unwrap(); let clients = MpcHelperClient::from_conf( &IpaRuntime::current(), - &network_config, - &identity.clone_with_key(), + &leaders_ring.network, + &identities.helper.clone_with_key(), ); let handler = self.handler.as_ref().map(HandlerBox::owning_ref); let client = clients[0].clone(); let (transport, server) = MpcHttpTransport::new( IpaRuntime::current(), HelperIdentity::ONE, - server_config, - network_config.clone(), + first_server.config, + leaders_ring.network.clone(), &clients, handler, ); let (addr, handle) = server - .start_on(&IpaRuntime::current(), Some(server_socket), self.metrics) + .start_on(&IpaRuntime::current(), first_server.socket, self.metrics) .await; TestServer { addr, @@ -318,17 +500,63 @@ impl TestServerBuilder { } } -fn get_test_certificate_and_key(id: HelperIdentity) -> (&'static [u8], &'static [u8]) { +pub struct ClientIdentities { + pub helper: ClientIdentity, + pub shard: ClientIdentity, +} + +impl ClientIdentities { + #[must_use] + pub fn new(disable_https: bool, id: ShardedHelperIdentity) -> Self { + if disable_https { + ClientIdentities { + helper: ClientIdentity::Header(id.helper_identity), + shard: ClientIdentity::Header(id.shard_index), + } + } else { + get_client_test_identity(id) + } + } +} + +impl Index for [&'static [u8]; S] { + type Output = &'static [u8]; + + fn index(&self, index: ShardedHelperIdentity) -> &Self::Output { + let pos = index.as_index(); + self.get(pos) + .unwrap_or_else(|| panic!("The computed index {pos} is outside of {S}")) + } +} + +impl Index for Lazy<[CertificateDer<'static>; S]> { + type Output = CertificateDer<'static>; + + fn index(&self, index: ShardedHelperIdentity) -> &Self::Output { + let pos = index.as_index(); + self.get(pos) + .unwrap_or_else(|| panic!("The computed index {pos} is outside of {S}")) + } +} + +pub(super) fn get_test_certificate_and_key( + id: ShardedHelperIdentity, +) -> (&'static [u8], &'static [u8]) { (TEST_CERTS[id], TEST_KEYS[id]) } +/// Creating a cert client identity. Using the same certificate for both shard and mpc. #[must_use] -pub fn get_test_identity(id: HelperIdentity) -> ClientIdentity { +pub fn get_client_test_identity(id: ShardedHelperIdentity) -> ClientIdentities { let (mut certificate, mut private_key) = get_test_certificate_and_key(id); - ClientIdentity::from_pkcs8(&mut certificate, &mut private_key).unwrap() + let (mut scertificate, mut sprivate_key) = get_test_certificate_and_key(id); + ClientIdentities { + helper: ClientIdentity::from_pkcs8(&mut certificate, &mut private_key).unwrap(), + shard: ClientIdentity::from_pkcs8(&mut scertificate, &mut sprivate_key).unwrap(), + } } -pub const TEST_CERTS: [&[u8]; 3] = [ +const TEST_CERTS: [&[u8]; 6] = [ b"\ -----BEGIN CERTIFICATE----- MIIBZjCCAQ2gAwIBAgIIGGCAUnB4cZcwCgYIKoZIzj0EAwIwFDESMBAGA1UEAwwJ @@ -364,14 +592,50 @@ BAQDAgKkMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjAKBggqhkjOPQQD AgNHADBEAiB+K2yadiLIDR7ZvDpyMIXP70gL3CXp7JmVmh8ygFtbjQIgU16wnFBy jn+NXYPeKEWnkCcVKjFED6MevGnOgrJylgY= -----END CERTIFICATE----- +", + b" +-----BEGIN CERTIFICATE----- +MIIBZDCCAQugAwIBAgIIFeKzq6ypfYgwCgYIKoZIzj0EAwIwFDESMBAGA1UEAwwJ +bG9jYWxob3N0MB4XDTI0MTAwNjIyMTEzOFoXDTI1MDEwNTIyMTEzOFowFDESMBAG +A1UEAwwJbG9jYWxob3N0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAECKdJUHmm +Mmqtvhu4PpWwwZnu+LFjaE8Y9guDNIXN+O9kulFl1hLVMx6WLpoScrLYlvHrQvcq +/BTG24EOKAeaRqNHMEUwFAYDVR0RBA0wC4IJbG9jYWxob3N0MA4GA1UdDwEB/wQE +AwICpDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwCgYIKoZIzj0EAwID +RwAwRAIgBO2SBoLmPikfcovOFpjA8jpY+JuSybeISUKD2GAsXQICIEChXm7/UJ7p +86qXEVsjN2N1pyRd6rUNxLyCaV87ZmfS +-----END CERTIFICATE----- +", + b" +-----BEGIN CERTIFICATE----- +MIIBZTCCAQugAwIBAgIIXTgB/bkN/aUwCgYIKoZIzj0EAwIwFDESMBAGA1UEAwwJ +bG9jYWxob3N0MB4XDTI0MTAwNjIyMTIwM1oXDTI1MDEwNTIyMTIwM1owFDESMBAG +A1UEAwwJbG9jYWxob3N0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEyzSofZIX +XgLUKGumrN3SEXOMOAKXcl1VshTBzvyVwxxnD01WVLgS80/TELEltT8SMj1Cgu7I +tkDx3EVPjq4pOKNHMEUwFAYDVR0RBA0wC4IJbG9jYWxob3N0MA4GA1UdDwEB/wQE +AwICpDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwCgYIKoZIzj0EAwID +SAAwRQIhAN93g0zfB/4VyhNOaY1uCb4af4qMxcz1wp0yZ7HKAyWqAiBVPgv4X7aR +JMepVZwIWJrVhnxdcmzOuONoeLZPZraFpw== +-----END CERTIFICATE----- +", + b" +-----BEGIN CERTIFICATE----- +MIIBZTCCAQugAwIBAgIIXTgB/bkN/aUwCgYIKoZIzj0EAwIwFDESMBAGA1UEAwwJ +bG9jYWxob3N0MB4XDTI0MTAwNjIyMTIwM1oXDTI1MDEwNTIyMTIwM1owFDESMBAG +A1UEAwwJbG9jYWxob3N0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEyzSofZIX +XgLUKGumrN3SEXOMOAKXcl1VshTBzvyVwxxnD01WVLgS80/TELEltT8SMj1Cgu7I +tkDx3EVPjq4pOKNHMEUwFAYDVR0RBA0wC4IJbG9jYWxob3N0MA4GA1UdDwEB/wQE +AwICpDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwCgYIKoZIzj0EAwID +SAAwRQIhAN93g0zfB/4VyhNOaY1uCb4af4qMxcz1wp0yZ7HKAyWqAiBVPgv4X7aR +JMepVZwIWJrVhnxdcmzOuONoeLZPZraFpw== +-----END CERTIFICATE----- ", ]; -pub static TEST_CERTS_DER: Lazy<[CertificateDer; 3]> = Lazy::new(|| { +static TEST_CERTS_DER: Lazy<[CertificateDer; 6]> = Lazy::new(|| { TEST_CERTS.map(|mut pem| rustls_pemfile::certs(&mut pem).flatten().next().unwrap()) }); -pub const TEST_KEYS: [&[u8]; 3] = [ +const TEST_KEYS: [&[u8]; 6] = [ b"\ -----BEGIN PRIVATE KEY----- MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgHmPeGcv6Dy9QWPHD @@ -392,6 +656,27 @@ MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgDfOsXGbO9T6e9mPb u9BeVKo7j/DyX4j3XcqrOYnIwOOhRANCAASEORA/IDvqRGiJpddoyocRa+9HEG2B 6P8vfTTV28Ph7n9YBgJodGd29Kt7Dy2IdCjy7PsOik5KGZ4Ee+a+juKk -----END PRIVATE KEY----- +", + b"\ +-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgWlbBJGC40HwzwMsd +3a6o6x75HZgRnktVwBoi6/84nPmhRANCAAQIp0lQeaYyaq2+G7g+lbDBme74sWNo +Txj2C4M0hc3472S6UWXWEtUzHpYumhJystiW8etC9yr8FMbbgQ4oB5pG +-----END PRIVATE KEY----- +", + b"\ +-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgi9TsF4lX49P+GIER +DjyUhMiyRZ52EsD00dGPRA4XJbahRANCAATLNKh9khdeAtQoa6as3dIRc4w4Apdy +XVWyFMHO/JXDHGcPTVZUuBLzT9MQsSW1PxIyPUKC7si2QPHcRU+Orik4 +-----END PRIVATE KEY----- +", + b"\ +-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgs8cH8I4hrdrqDN/d +p1HENqJEFXMwcERH5JFyW/B6D/ChRANCAAT+nXv66H0vd2omUjYwWDYbGkIiGc6S +jzcyiSIULkaelVYvnEQBYefjGLQwvwbifmMrQ+hfQUT9FNbGRQ788pK9 +-----END PRIVATE KEY----- ", ]; @@ -404,3 +689,91 @@ const TEST_HPKE_PUBLIC_KEY: &str = "\ const TEST_HPKE_PRIVATE_KEY: &str = "\ a0778c3e9960576cbef4312a3b7ca34137880fd588c11047bd8b6a8b70b5a151 "; + +#[cfg(all(test, unit_test))] +mod tests { + use super::{get_test_certificate_and_key, TestConfigBuilder}; + use crate::{ + helpers::HelperIdentity, + net::test::{Ports, TEST_CERTS, TEST_KEYS}, + sharding::{ShardIndex, ShardedHelperIdentity}, + }; + + /// This simple test makes sure that testing networks are created properly. + /// The network itself won't be excersized as that's tested elsewhere. + #[test] + fn create_4_shard_http_network() { + let ports: Vec = vec![ + Ports { + ring: [10000, 10001, 10002], + shards: [10005, 10006, 10007], + }, + Ports { + ring: [10010, 10011, 10012], + shards: [10015, 10016, 10017], + }, + Ports { + ring: [10020, 10021, 10022], + shards: [10025, 10026, 10027], + }, + Ports { + ring: [10030, 10031, 10032], + shards: [10035, 10036, 10037], + }, + ]; + // Providing ports and no https certs to keep this test fast + let conf = TestConfigBuilder::default() + .with_disable_https_option(true) + .with_ports_by_ring(ports) + .build(); + + assert!(conf.disable_https); + assert_eq!(conf.rings.len(), 4); + assert_eq!(conf.shards.len(), 3); + let shards_2 = conf.get_shards_for_helper(HelperIdentity::TWO); + assert_eq!(shards_2.get_first_shard().config.port, Some(10006)); + let second_helper_third_shard_configs = &shards_2.servers[2]; + assert_eq!(second_helper_third_shard_configs.config.port, Some(10026)); + let leader_ring_configs = &conf.leaders_ring().servers; + assert_eq!(leader_ring_configs[2].config.port, Some(10002)); + } + + #[test] + #[should_panic(expected = "Found duplicate port 10001")] + fn overlapping_ports() { + let ports: Vec = vec![Ports { + ring: [10000, 10001, 10002], + shards: [10001, 10006, 10007], + }]; + let _ = TestConfigBuilder::default() + .with_disable_https_option(true) + .with_ports_by_ring(ports) + .build(); + } + + #[test] + fn get_assets_by_index() { + let (c, k) = get_test_certificate_and_key(ShardedHelperIdentity::ONE_FIRST); + assert_eq!(TEST_KEYS[0], k); + assert_eq!(TEST_CERTS[0], c); + } + + #[test] + fn get_default_ports() { + let builder = TestConfigBuilder::with_http_and_default_test_ports(); + assert_eq!( + vec![Some(3000), Some(3001), Some(3002)], + builder.get_ports_for_shard_index(ShardIndex(0)) + ); + assert_eq!( + vec![Some(6001)], + builder.get_ports_for_helper_identity(HelperIdentity::TWO) + ); + } + + #[test] + fn get_os_ports() { + let builder = TestConfigBuilder::default(); + assert_eq!(3, builder.get_ports_for_shard_index(ShardIndex(0)).len()); + } +} diff --git a/ipa-core/src/net/transport.rs b/ipa-core/src/net/transport.rs index a0fcd92a3..0aedcb937 100644 --- a/ipa-core/src/net/transport.rs +++ b/ipa-core/src/net/transport.rs @@ -348,10 +348,13 @@ impl Transport for ShardHttpTransport { #[cfg(all(test, web_test, descriptive_gate))] mod tests { - use std::{iter::zip, net::TcpListener, task::Poll}; + use std::{iter::zip, task::Poll}; use bytes::Bytes; - use futures::stream::{poll_immediate, StreamExt}; + use futures::{ + future::join, + stream::{poll_immediate, StreamExt}, + }; use futures_util::future::{join_all, try_join_all}; use generic_array::GenericArray; use once_cell::sync::Lazy; @@ -361,7 +364,6 @@ mod tests { use super::*; use crate::{ - config::{NetworkConfig, ServerConfig}, ff::{FieldType, Fp31, Serializable}, helpers::{ make_owned_handler, @@ -369,9 +371,10 @@ mod tests { }, net::{ client::ClientIdentity, - test::{get_test_identity, TestConfig, TestConfigBuilder, TestServer}, + test::{ClientIdentities, TestConfig, TestConfigBuilder, TestServer}, }, secret_sharing::{replicated::semi_honest::AdditiveShare, IntoShares}, + sharding::ShardedHelperIdentity, test_fixture::Reconstruct, AppConfig, AppSetup, HelperApp, }; @@ -446,96 +449,87 @@ mod tests { } // TODO(651): write a test for an error while reading the body (after error handling is finalized) - - async fn make_helpers( - sockets: [TcpListener; 3], - server_config: [ServerConfig; 3], - network_config: &NetworkConfig, - disable_https: bool, - ) -> [HelperApp; 3] { - join_all( - zip(HelperIdentity::make_three(), zip(sockets, server_config)).map( - |(id, (socket, server_config))| async move { - let identity = if disable_https { - ClientIdentity::Header(id) - } else { - get_test_identity(id) - }; - let (setup, handler) = AppSetup::new(AppConfig::default()); - let clients = MpcHelperClient::from_conf( - &IpaRuntime::current(), - network_config, - &identity, - ); - let (transport, server) = MpcHttpTransport::new( - IpaRuntime::current(), - id, - server_config.clone(), - network_config.clone(), - &clients, - Some(handler), - ); - // TODO: Following is just temporary until Shard Transport is actually used. - let shard_clients_config = network_config.client.clone(); - let shard_server_config = server_config; - let shard_network_config = - NetworkConfig::new_shards(vec![], shard_clients_config); - let (shard_transport, _shard_server) = ShardHttpTransport::new( - IpaRuntime::current(), - ShardIndex::FIRST, - shard_server_config, - shard_network_config, - vec![], - None, - ); - // --- - - server - .start_on(&IpaRuntime::current(), Some(socket), ()) - .await; - + async fn make_helpers(mut conf: TestConfig) -> [HelperApp; 3] { + let leaders_ring = conf.rings.pop().unwrap(); + join_all(zip(HelperIdentity::make_three(), leaders_ring.servers).map( + |(id, mut addr_server)| { + let (setup, mpc_handler) = AppSetup::new(AppConfig::default()); + let sid = ShardedHelperIdentity::new(id, ShardIndex::FIRST); + let identities = ClientIdentities::new(conf.disable_https, sid); + + // Ring config + let clients = MpcHelperClient::from_conf( + &IpaRuntime::current(), + &leaders_ring.network, + &identities.helper, + ); + let (transport, server) = MpcHttpTransport::new( + IpaRuntime::current(), + id, + addr_server.config.clone(), + leaders_ring.network.clone(), + &clients, + Some(mpc_handler), + ); + + // Shard Config + let helper_shards = conf.get_shards_for_helper(id); + let addr_shard = helper_shards.get_first_shard(); + let shard_network_config = helper_shards.network.clone(); + let shard_clients = MpcHelperClient::::shards_from_conf( + &IpaRuntime::current(), + &shard_network_config, + &identities.shard, + ); + let (shard_transport, shard_server) = ShardHttpTransport::new( + IpaRuntime::current(), + sid.shard_index, + addr_shard.config.clone(), + shard_network_config, + shard_clients, + None, // This will come online once we go into Query Workflow + ); + + let helper_shards = conf.get_shards_for_helper_mut(id); + let addr_shard = helper_shards.get_first_shard_mut(); + let ring_socket = addr_server.socket.take(); + let sharding_socket = addr_shard.socket.take(); + + async move { + join( + server.start_on(&IpaRuntime::current(), ring_socket, ()), + shard_server.start_on(&IpaRuntime::current(), sharding_socket, ()), + ) + .await; setup.connect(transport, shard_transport) - }, - ), - ) + } + }, + )) .await .try_into() .ok() .unwrap() } - async fn test_three_helpers(mut conf: TestConfig) { + async fn test_three_helpers(conf: TestConfig) { let clients = MpcHelperClient::from_conf( &IpaRuntime::current(), - &conf.network, + &conf.leaders_ring().network, &ClientIdentity::None, ); - let _helpers = make_helpers( - conf.sockets.take().unwrap(), - conf.servers, - &conf.network, - conf.disable_https, - ) - .await; - + let _helpers = make_helpers(conf).await; test_multiply(&clients).await; } #[tokio::test(flavor = "multi_thread")] async fn happy_case_twice() { - let mut conf = TestConfigBuilder::with_open_ports().build(); + let conf = TestConfigBuilder::default().build(); let clients = MpcHelperClient::from_conf( &IpaRuntime::current(), - &conf.network, + &conf.leaders_ring().network, &ClientIdentity::None, ); - let _helpers = make_helpers( - conf.sockets.take().unwrap(), - conf.servers, - &conf.network, - conf.disable_https, - ) - .await; + let _helpers = make_helpers(conf).await; test_multiply(&clients).await; test_multiply(&clients).await; @@ -585,7 +579,7 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn three_helpers_http() { - let conf = TestConfigBuilder::with_open_ports() + let conf = TestConfigBuilder::default() .with_disable_https_option(true) .build(); test_three_helpers(conf).await; @@ -593,7 +587,7 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn three_helpers_https() { - let conf = TestConfigBuilder::with_open_ports().build(); + let conf = TestConfigBuilder::default().build(); test_three_helpers(conf).await; } } diff --git a/ipa-core/src/sharding.rs b/ipa-core/src/sharding.rs index 3c88c860b..9972867ca 100644 --- a/ipa-core/src/sharding.rs +++ b/ipa-core/src/sharding.rs @@ -6,6 +6,34 @@ use std::{ use ipa_metrics::LabelValue; +use crate::helpers::{HelperIdentity, TransportIdentity}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct ShardedHelperIdentity { + pub helper_identity: HelperIdentity, + pub shard_index: ShardIndex, +} + +impl ShardedHelperIdentity { + pub const ONE_FIRST: ShardedHelperIdentity = ShardedHelperIdentity { + helper_identity: HelperIdentity::ONE, + shard_index: ShardIndex::FIRST, + }; + + #[must_use] + pub fn new(helper_identity: HelperIdentity, shard_index: ShardIndex) -> Self { + Self { + helper_identity, + shard_index, + } + } + + #[must_use] + pub fn as_index(&self) -> usize { + self.shard_index.as_index() * 3 + self.helper_identity.as_index() + } +} + /// A unique zero-based index of the helper shard. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct ShardIndex(pub u32);