From 0319d2c64c098900f4d4349ba99000477ae54177 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karen=20C=C3=A1rcamo?= Date: Tue, 5 Sep 2023 15:00:04 -0600 Subject: [PATCH] Scale out Clickhouse to a multinode cluster (temporarily disabled) (#3494) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Replication This PR implements an initial 2 replica 3 coordinator ClickHouse set up. I've settled on this initial lean architecture as I want to avoid cluttering with what may be unnecessary additional nodes and using up our customers resources. As we gauge the system alongside our first customers we can decide if we really do need more replicas or not. Inserting an additional replica is very straightforward, as we only need to make a few changes to the templates/service count and restart the ClickHouse services. ## Sharding Sharding can prove to be very resource intensive, and we have yet to fully understand our customer's needs. I'd like to avoid a situation where we are prematurely optimising when we have so many unknowns. We also have not had time to perform long running testing. See official ClickHouse [recommendations](https://clickhouse.com/blog/common-getting-started-issues-with-clickhouse#2-going-horizontal-too-early). Like additional replicas, we can have additional shards if we find them to be necessary down the track. ## Testing I have left most tests as a single node set up. It feels unnecessary to spin up so many things constantly. If people disagree, I can modify this. I have run many many manual tests, starting and stopping services and so far the set up has held up. Using a ClickHouse client: ```console root@oxz_clickhouse_af08dce0-41ce-4922-8d51-0f546f23ff3e:~# ifconfig oxControlService13:1: flags=21002000841 mtu 9000 index 2 inet6 fd00:1122:3344:101::f/64 root@oxz_clickhouse_af08dce0-41ce-4922-8d51-0f546f23ff3e:~# cd /opt/oxide/clickhouse/ root@oxz_clickhouse_af08dce0-41ce-4922-8d51-0f546f23ff3e:/opt/oxide/clickhouse# ./clickhouse client --host fd00:1122:3344:101::f ClickHouse client version 22.8.9.1. Connecting to fd00:1122:3344:101::f:9000 as user default. Connected to ClickHouse server version 22.8.9 revision 54460. oximeter_cluster node 2 :) SELECT * FROM oximeter.fields_i64 SELECT * FROM oximeter.fields_i64 Query id: dedbfbba-d949-49bd-9f9c-0f81a1240798 ┌─timeseries_name───┬───────timeseries_key─┬─field_name─┬─field_value─┐ │ data_link:enabled │ 9572423277405807617 │ link_id │ 0 │ │ data_link:enabled │ 12564290087547100823 │ link_id │ 0 │ │ data_link:enabled │ 16314114164963669893 │ link_id │ 0 │ │ data_link:link_up │ 9572423277405807617 │ link_id │ 0 │ │ data_link:link_up │ 12564290087547100823 │ link_id │ 0 │ │ data_link:link_up │ 16314114164963669893 │ link_id │ 0 │ └───────────────────┴──────────────────────┴────────────┴─────────────┘ 6 rows in set. Elapsed: 0.003 sec. ``` To retrieve information about the keepers you can use the provided [commands](https://clickhouse.com/docs/en/guides/sre/keeper/clickhouse-keeper#four-letter-word-commands) within each of the keeper zones. Example: ```console root@oxz_clickhouse_keeper_9b70b23c-a7c4-4102-a7a1-525537dcf463:~# ifconfig oxControlService17:1: flags=21002000841 mtu 9000 index 2 inet6 fd00:1122:3344:101::11/64 root@oxz_clickhouse_keeper_9b70b23c-a7c4-4102-a7a1-525537dcf463:~# echo mntr | nc fd00:1122:3344:101::11 9181 zk_version v22.8.9.1-lts-ac5a6cababc9153320b1892eece62e1468058c26 zk_avg_latency 0 zk_max_latency 1 zk_min_latency 0 zk_packets_received 60 zk_packets_sent 60 zk_num_alive_connections 1 zk_outstanding_requests 0 zk_server_state leader zk_znode_count 6 zk_watch_count 1 zk_ephemerals_count 0 zk_approximate_data_size 1271 zk_key_arena_size 4096 zk_latest_snapshot_size 0 zk_followers 2 zk_synced_followers 2 ``` Closes: https://github.com/oxidecomputer/omicron/issues/2158 ## Update As mentioned previously, we came to an agreement at the last control plane meeting that software installed on the racks should not diverge due to replicated ClickHouse. This means that while ClickHouse [replication is functional](https://github.com/oxidecomputer/omicron/runs/16327261230) in this PR, it has been disabled in the last commit in the following manner: - The `method_script.sh` for the `clickhouse` service is set to run single node mode by default, but can be switched to run on replicated mode by [swapping a variable to false](https://github.com/oxidecomputer/omicron/pull/3494/files#diff-5475f31ccc4d46ea5ed682a38970067eacc337f0c6cd52581b1609f4ecce6071R31-R49). When we migrate all racks to a replicated ClickHouse setup, all logic related to running on single node will be removed from that file. - The number of zones defined through RSS will stay the same. Instructions on how to tweak them to launch in replicated mode have been [left in the form of comments](https://github.com/oxidecomputer/omicron/pull/3494/files#diff-9ea2b79544fdd0a21914ea354fba0b3670258746b1350d900285445d399861e1R59-R64). ### Testing I ran the full CI testing suite on both replicated and single node mode. You can find the replicated test results [here](https://github.com/oxidecomputer/omicron/runs/16327261230), and the single node with disabled replication [here](https://github.com/oxidecomputer/omicron/runs/16360694678) Additionally, I have added tests that validate the replicated db_init file [here](https://github.com/oxidecomputer/omicron/pull/3494/files#diff-6f5af870905bd92fc2c62db62d674d1e033edee57adbe7ea70d929c79cd03ba1R671), and incorporated [checks](https://github.com/oxidecomputer/omicron/pull/3494/files#diff-6f5af870905bd92fc2c62db62d674d1e033edee57adbe7ea70d929c79cd03ba1R664) in tests that validate whether a CH instance is part of a cluster or not. ### Next steps To keep this PR compact (if you can call 2000 lines compact), I have created several issues to tackle after this PR is merged from the review comments. In prioritised order, these are: - https://github.com/oxidecomputer/omicron/issues/3982 from this [comment](https://github.com/oxidecomputer/omicron/pull/3494/#issuecomment-1696096624) - https://github.com/oxidecomputer/omicron/issues/3824 from this [comment](https://github.com/oxidecomputer/omicron/pull/3494/#discussion_r1283695665) - https://github.com/oxidecomputer/omicron/issues/3823 from this [comment](https://github.com/oxidecomputer/omicron/pull/3494/#discussion_r1284673917) --- .github/buildomat/jobs/package.sh | 1 + common/src/address.rs | 1 + dev-tools/src/bin/omicron-dev.rs | 2 +- internal-dns-cli/src/bin/dnswait.rs | 18 +- internal-dns/src/config.rs | 4 + internal-dns/src/names.rs | 3 + nexus/benches/setup_benchmark.rs | 2 +- nexus/db-model/src/dataset_kind.rs | 4 + nexus/db-model/src/schema.rs | 2 +- nexus/db-model/src/service_kind.rs | 4 + nexus/test-utils/src/lib.rs | 4 +- nexus/tests/integration_tests/oximeter.rs | 5 +- nexus/types/src/internal_api/params.rs | 4 + openapi/nexus-internal.json | 15 + openapi/sled-agent.json | 33 ++ oximeter/collector/src/lib.rs | 7 +- oximeter/db/src/bin/oxdb.rs | 8 +- oximeter/db/src/client.rs | 210 ++++++-- oximeter/db/src/configs/keeper_config.xml | 43 ++ oximeter/db/src/configs/replica_config.xml | 487 ++++++++++++++++++ oximeter/db/src/db-replicated-init.sql | 294 +++++++++++ .../{db-init.sql => db-single-node-init.sql} | 0 oximeter/db/src/db-wipe-replicated.sql | 1 + .../{db-wipe.sql => db-wipe-single-node.sql} | 0 package-manifest.toml | 32 +- schema/all-zone-requests.json | 33 ++ schema/crdb/4.0.0/up.sql | 37 ++ schema/crdb/dbinit.sql | 4 +- schema/rss-service-plan.json | 33 ++ sled-agent/src/params.rs | 26 + sled-agent/src/rack_setup/plan/service.rs | 45 ++ sled-agent/src/services.rs | 126 +++-- smf/clickhouse/config_replica.xml | 82 +++ smf/clickhouse/method_script.sh | 129 ++++- smf/clickhouse_keeper/keeper_config.xml | 43 ++ smf/clickhouse_keeper/manifest.xml | 43 ++ smf/clickhouse_keeper/method_script.sh | 128 +++++ smf/sled-agent/non-gimlet/config.toml | 4 + test-utils/src/dev/clickhouse.rs | 354 +++++++++++-- 39 files changed, 2140 insertions(+), 131 deletions(-) create mode 100644 oximeter/db/src/configs/keeper_config.xml create mode 100644 oximeter/db/src/configs/replica_config.xml create mode 100644 oximeter/db/src/db-replicated-init.sql rename oximeter/db/src/{db-init.sql => db-single-node-init.sql} (100%) create mode 100644 oximeter/db/src/db-wipe-replicated.sql rename oximeter/db/src/{db-wipe.sql => db-wipe-single-node.sql} (100%) create mode 100644 schema/crdb/4.0.0/up.sql create mode 100644 smf/clickhouse/config_replica.xml create mode 100644 smf/clickhouse_keeper/keeper_config.xml create mode 100644 smf/clickhouse_keeper/manifest.xml create mode 100755 smf/clickhouse_keeper/method_script.sh diff --git a/.github/buildomat/jobs/package.sh b/.github/buildomat/jobs/package.sh index 27656e0c1a..eedb3b1723 100755 --- a/.github/buildomat/jobs/package.sh +++ b/.github/buildomat/jobs/package.sh @@ -104,6 +104,7 @@ ptime -m ./tools/build-global-zone-packages.sh "$tarball_src_dir" /work mkdir -p /work/zones zones=( out/clickhouse.tar.gz + out/clickhouse_keeper.tar.gz out/cockroachdb.tar.gz out/crucible-pantry.tar.gz out/crucible.tar.gz diff --git a/common/src/address.rs b/common/src/address.rs index ca04ce327a..0358787258 100644 --- a/common/src/address.rs +++ b/common/src/address.rs @@ -36,6 +36,7 @@ pub const PROPOLIS_PORT: u16 = 12400; pub const COCKROACH_PORT: u16 = 32221; pub const CRUCIBLE_PORT: u16 = 32345; pub const CLICKHOUSE_PORT: u16 = 8123; +pub const CLICKHOUSE_KEEPER_PORT: u16 = 9181; pub const OXIMETER_PORT: u16 = 12223; pub const DENDRITE_PORT: u16 = 12224; pub const DDMD_PORT: u16 = 8000; diff --git a/dev-tools/src/bin/omicron-dev.rs b/dev-tools/src/bin/omicron-dev.rs index c3a906c302..507e3bc918 100644 --- a/dev-tools/src/bin/omicron-dev.rs +++ b/dev-tools/src/bin/omicron-dev.rs @@ -268,7 +268,7 @@ async fn cmd_clickhouse_run(args: &ChRunArgs) -> Result<(), anyhow::Error> { // Start the database server process, possibly on a specific port let mut db_instance = - dev::clickhouse::ClickHouseInstance::new(args.port).await?; + dev::clickhouse::ClickHouseInstance::new_single_node(args.port).await?; println!( "omicron-dev: running ClickHouse with full command:\n\"clickhouse {}\"", db_instance.cmdline().join(" ") diff --git a/internal-dns-cli/src/bin/dnswait.rs b/internal-dns-cli/src/bin/dnswait.rs index 930f767da4..df4832f346 100644 --- a/internal-dns-cli/src/bin/dnswait.rs +++ b/internal-dns-cli/src/bin/dnswait.rs @@ -23,21 +23,31 @@ struct Opt { #[clap(long, action)] nameserver_addresses: Vec, - /// service name to be resolved (should be the target of a DNS name) + /// Service name to be resolved (should be the target of a DNS name) #[arg(value_enum)] srv_name: ServiceName, + + /// Output service host names only, omitting the port + #[clap(long, short = 'H', action)] + hostname_only: bool, } #[derive(Debug, Clone, Copy, ValueEnum)] #[value(rename_all = "kebab-case")] enum ServiceName { Cockroach, + Clickhouse, + ClickhouseKeeper, } impl From for internal_dns::ServiceName { fn from(value: ServiceName) -> Self { match value { ServiceName::Cockroach => internal_dns::ServiceName::Cockroach, + ServiceName::Clickhouse => internal_dns::ServiceName::Clickhouse, + ServiceName::ClickhouseKeeper => { + internal_dns::ServiceName::ClickhouseKeeper + } } } } @@ -91,7 +101,11 @@ async fn main() -> Result<()> { .context("unexpectedly gave up")?; for (target, port) in result { - println!("{}:{}", target, port) + if opt.hostname_only { + println!("{}", target) + } else { + println!("{}:{}", target, port) + } } Ok(()) diff --git a/internal-dns/src/config.rs b/internal-dns/src/config.rs index e1b3129722..5572e193dc 100644 --- a/internal-dns/src/config.rs +++ b/internal-dns/src/config.rs @@ -422,6 +422,10 @@ mod test { #[test] fn display_srv_service() { assert_eq!(ServiceName::Clickhouse.dns_name(), "_clickhouse._tcp",); + assert_eq!( + ServiceName::ClickhouseKeeper.dns_name(), + "_clickhouse-keeper._tcp", + ); assert_eq!(ServiceName::Cockroach.dns_name(), "_cockroach._tcp",); assert_eq!(ServiceName::InternalDns.dns_name(), "_nameservice._tcp",); assert_eq!(ServiceName::Nexus.dns_name(), "_nexus._tcp",); diff --git a/internal-dns/src/names.rs b/internal-dns/src/names.rs index 3f663263e1..663e04bcd9 100644 --- a/internal-dns/src/names.rs +++ b/internal-dns/src/names.rs @@ -17,6 +17,7 @@ pub const DNS_ZONE_EXTERNAL_TESTING: &str = "oxide-dev.test"; #[derive(Clone, Debug, Hash, Eq, Ord, PartialEq, PartialOrd)] pub enum ServiceName { Clickhouse, + ClickhouseKeeper, Cockroach, InternalDns, ExternalDns, @@ -38,6 +39,7 @@ impl ServiceName { fn service_kind(&self) -> &'static str { match self { ServiceName::Clickhouse => "clickhouse", + ServiceName::ClickhouseKeeper => "clickhouse-keeper", ServiceName::Cockroach => "cockroach", ServiceName::ExternalDns => "external-dns", ServiceName::InternalDns => "nameservice", @@ -61,6 +63,7 @@ impl ServiceName { pub(crate) fn dns_name(&self) -> String { match self { ServiceName::Clickhouse + | ServiceName::ClickhouseKeeper | ServiceName::Cockroach | ServiceName::InternalDns | ServiceName::ExternalDns diff --git a/nexus/benches/setup_benchmark.rs b/nexus/benches/setup_benchmark.rs index d9e9577a1f..304ccc8325 100644 --- a/nexus/benches/setup_benchmark.rs +++ b/nexus/benches/setup_benchmark.rs @@ -29,7 +29,7 @@ async fn do_crdb_setup() { // Wraps exclusively the ClickhouseDB portion of setup/teardown. async fn do_clickhouse_setup() { let mut clickhouse = - dev::clickhouse::ClickHouseInstance::new(0).await.unwrap(); + dev::clickhouse::ClickHouseInstance::new_single_node(0).await.unwrap(); clickhouse.cleanup().await.unwrap(); } diff --git a/nexus/db-model/src/dataset_kind.rs b/nexus/db-model/src/dataset_kind.rs index f4c6a5eee6..d068f48fd3 100644 --- a/nexus/db-model/src/dataset_kind.rs +++ b/nexus/db-model/src/dataset_kind.rs @@ -19,6 +19,7 @@ impl_enum_type!( Crucible => b"crucible" Cockroach => b"cockroach" Clickhouse => b"clickhouse" + ClickhouseKeeper => b"clickhouse_keeper" ExternalDns => b"external_dns" InternalDns => b"internal_dns" ); @@ -35,6 +36,9 @@ impl From for DatasetKind { internal_api::params::DatasetKind::Clickhouse => { DatasetKind::Clickhouse } + internal_api::params::DatasetKind::ClickhouseKeeper => { + DatasetKind::ClickhouseKeeper + } internal_api::params::DatasetKind::ExternalDns => { DatasetKind::ExternalDns } diff --git a/nexus/db-model/src/schema.rs b/nexus/db-model/src/schema.rs index 78ebb527cd..3fde9ee715 100644 --- a/nexus/db-model/src/schema.rs +++ b/nexus/db-model/src/schema.rs @@ -1130,7 +1130,7 @@ table! { /// /// This should be updated whenever the schema is changed. For more details, /// refer to: schema/crdb/README.adoc -pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(3, 0, 3); +pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(4, 0, 0); allow_tables_to_appear_in_same_query!( system_update, diff --git a/nexus/db-model/src/service_kind.rs b/nexus/db-model/src/service_kind.rs index afb29abaa7..d5a34f07db 100644 --- a/nexus/db-model/src/service_kind.rs +++ b/nexus/db-model/src/service_kind.rs @@ -18,6 +18,7 @@ impl_enum_type!( // Enum values Clickhouse => b"clickhouse" + ClickhouseKeeper => b"clickhouse_keeper" Cockroach => b"cockroach" Crucible => b"crucible" CruciblePantry => b"crucible_pantry" @@ -54,6 +55,9 @@ impl From for ServiceKind { internal_api::params::ServiceKind::Clickhouse => { ServiceKind::Clickhouse } + internal_api::params::ServiceKind::ClickhouseKeeper => { + ServiceKind::ClickhouseKeeper + } internal_api::params::ServiceKind::Cockroach => { ServiceKind::Cockroach } diff --git a/nexus/test-utils/src/lib.rs b/nexus/test-utils/src/lib.rs index 3b9bf86c2d..fb88ac83a5 100644 --- a/nexus/test-utils/src/lib.rs +++ b/nexus/test-utils/src/lib.rs @@ -321,7 +321,9 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> { let log = &self.logctx.log; debug!(log, "Starting Clickhouse"); let clickhouse = - dev::clickhouse::ClickHouseInstance::new(0).await.unwrap(); + dev::clickhouse::ClickHouseInstance::new_single_node(0) + .await + .unwrap(); let port = clickhouse.port(); let zpool_id = Uuid::new_v4(); diff --git a/nexus/tests/integration_tests/oximeter.rs b/nexus/tests/integration_tests/oximeter.rs index 7d1a4b318a..2cda594e18 100644 --- a/nexus/tests/integration_tests/oximeter.rs +++ b/nexus/tests/integration_tests/oximeter.rs @@ -110,7 +110,10 @@ async fn test_oximeter_reregistration() { ); let client = oximeter_db::Client::new(ch_address.into(), &context.logctx.log); - client.init_db().await.expect("Failed to initialize timeseries database"); + client + .init_single_node_db() + .await + .expect("Failed to initialize timeseries database"); // Helper to retrieve the timeseries from ClickHouse let timeseries_name = "integration_target:integration_metric"; diff --git a/nexus/types/src/internal_api/params.rs b/nexus/types/src/internal_api/params.rs index 89c7a88f1f..e2a5e3d094 100644 --- a/nexus/types/src/internal_api/params.rs +++ b/nexus/types/src/internal_api/params.rs @@ -125,6 +125,7 @@ pub enum DatasetKind { Crucible, Cockroach, Clickhouse, + ClickhouseKeeper, ExternalDns, InternalDns, } @@ -136,6 +137,7 @@ impl fmt::Display for DatasetKind { Crucible => "crucible", Cockroach => "cockroach", Clickhouse => "clickhouse", + ClickhouseKeeper => "clickhouse_keeper", ExternalDns => "external_dns", InternalDns => "internal_dns", }; @@ -168,6 +170,7 @@ pub struct ServiceNic { #[serde(rename_all = "snake_case", tag = "type", content = "content")] pub enum ServiceKind { Clickhouse, + ClickhouseKeeper, Cockroach, Crucible, CruciblePantry, @@ -186,6 +189,7 @@ impl fmt::Display for ServiceKind { use ServiceKind::*; let s = match self { Clickhouse => "clickhouse", + ClickhouseKeeper => "clickhouse_keeper", Cockroach => "cockroach", Crucible => "crucible", ExternalDns { .. } => "external_dns", diff --git a/openapi/nexus-internal.json b/openapi/nexus-internal.json index 80c5c2dc24..12043d8096 100644 --- a/openapi/nexus-internal.json +++ b/openapi/nexus-internal.json @@ -922,6 +922,7 @@ "crucible", "cockroach", "clickhouse", + "clickhouse_keeper", "external_dns", "internal_dns" ] @@ -2803,6 +2804,20 @@ "type" ] }, + { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "clickhouse_keeper" + ] + } + }, + "required": [ + "type" + ] + }, { "type": "object", "properties": { diff --git a/openapi/sled-agent.json b/openapi/sled-agent.json index 748c1a4716..c33653c468 100644 --- a/openapi/sled-agent.json +++ b/openapi/sled-agent.json @@ -1091,6 +1091,20 @@ "type" ] }, + { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "clickhouse_keeper" + ] + } + }, + "required": [ + "type" + ] + }, { "type": "object", "properties": { @@ -2524,6 +2538,24 @@ "type" ] }, + { + "type": "object", + "properties": { + "address": { + "type": "string" + }, + "type": { + "type": "string", + "enum": [ + "clickhouse_keeper" + ] + } + }, + "required": [ + "address", + "type" + ] + }, { "type": "object", "properties": { @@ -3115,6 +3147,7 @@ "type": "string", "enum": [ "clickhouse", + "clickhouse_keeper", "cockroach_db", "crucible_pantry", "crucible", diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index 5367b05056..bf75b567ea 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -321,7 +321,12 @@ impl OximeterAgent { ) }; let client = Client::new(db_address, &log); - client.init_db().await?; + let replicated = client.is_oximeter_cluster().await?; + if !replicated { + client.init_single_node_db().await?; + } else { + client.init_replicated_db().await?; + } // Spawn the task for aggregating and inserting all metrics tokio::spawn(async move { diff --git a/oximeter/db/src/bin/oxdb.rs b/oximeter/db/src/bin/oxdb.rs index 76152d7710..e14fdeb6a8 100644 --- a/oximeter/db/src/bin/oxdb.rs +++ b/oximeter/db/src/bin/oxdb.rs @@ -148,7 +148,7 @@ async fn make_client( let address = SocketAddr::new(address, port); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .context("Failed to initialize timeseries database")?; Ok(client) @@ -261,13 +261,13 @@ async fn populate( Ok(()) } -async fn wipe_db( +async fn wipe_single_node_db( address: IpAddr, port: u16, log: Logger, ) -> Result<(), anyhow::Error> { let client = make_client(address, port, &log).await?; - client.wipe_db().await.context("Failed to wipe database") + client.wipe_single_node_db().await.context("Failed to wipe database") } async fn query( @@ -313,7 +313,7 @@ async fn main() { .unwrap(); } Subcommand::Wipe => { - wipe_db(args.address, args.port, log).await.unwrap() + wipe_single_node_db(args.address, args.port, log).await.unwrap() } Subcommand::Query { timeseries_name, diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index f566903f8f..ca45cef0b7 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -248,6 +248,13 @@ impl Client { .map_err(|e| Error::Database(e.to_string())) } + // Verifies if instance is part of oximeter_cluster + pub async fn is_oximeter_cluster(&self) -> Result { + let sql = String::from("SHOW CLUSTERS FORMAT JSONEachRow;"); + let res = self.execute_with_body(sql).await?; + Ok(res.contains("oximeter_cluster")) + } + // Verifies that the schema for a sample matches the schema in the database. // // If the schema exists in the database, and the sample matches that schema, `None` is @@ -432,11 +439,17 @@ pub trait DbWrite { /// Insert the given samples into the database. async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error>; - /// Initialize the telemetry database, creating tables as needed. - async fn init_db(&self) -> Result<(), Error>; + /// Initialize the replicated telemetry database, creating tables as needed. + async fn init_replicated_db(&self) -> Result<(), Error>; + + /// Initialize a single node telemetry database, creating tables as needed. + async fn init_single_node_db(&self) -> Result<(), Error>; - /// Wipe the ClickHouse database entirely. - async fn wipe_db(&self) -> Result<(), Error>; + /// Wipe the ClickHouse database entirely from a single node set up. + async fn wipe_single_node_db(&self) -> Result<(), Error>; + + /// Wipe the ClickHouse database entirely from a replicated set up. + async fn wipe_replicated_db(&self) -> Result<(), Error>; } #[async_trait] @@ -538,22 +551,41 @@ impl DbWrite for Client { Ok(()) } - /// Initialize the telemetry database, creating tables as needed. - async fn init_db(&self) -> Result<(), Error> { + /// Initialize the replicated telemetry database, creating tables as needed. + async fn init_replicated_db(&self) -> Result<(), Error> { // The HTTP client doesn't support multiple statements per query, so we break them out here // manually. debug!(self.log, "initializing ClickHouse database"); - let sql = include_str!("./db-init.sql"); + let sql = include_str!("./db-replicated-init.sql"); for query in sql.split("\n--\n") { self.execute(query.to_string()).await?; } Ok(()) } - /// Wipe the ClickHouse database entirely. - async fn wipe_db(&self) -> Result<(), Error> { + /// Initialize a single node telemetry database, creating tables as needed. + async fn init_single_node_db(&self) -> Result<(), Error> { + // The HTTP client doesn't support multiple statements per query, so we break them out here + // manually. + debug!(self.log, "initializing ClickHouse database"); + let sql = include_str!("./db-single-node-init.sql"); + for query in sql.split("\n--\n") { + self.execute(query.to_string()).await?; + } + Ok(()) + } + + /// Wipe the ClickHouse database entirely from a single node set up. + async fn wipe_single_node_db(&self) -> Result<(), Error> { + debug!(self.log, "wiping ClickHouse database"); + let sql = include_str!("./db-wipe-single-node.sql").to_string(); + self.execute(sql).await + } + + /// Wipe the ClickHouse database entirely from a replicated set up. + async fn wipe_replicated_db(&self) -> Result<(), Error> { debug!(self.log, "wiping ClickHouse database"); - let sql = include_str!("./db-wipe.sql").to_string(); + let sql = include_str!("./db-wipe-single-node.sql").to_string(); self.execute(sql).await } } @@ -605,6 +637,8 @@ mod tests { use oximeter::test_util; use oximeter::{Metric, Target}; use slog::o; + use std::time::Duration; + use tokio::time::sleep; // NOTE: It's important that each test run the ClickHouse server with different ports. // The tests each require a clean slate. Previously, we ran the tests in a different thread, @@ -621,28 +655,144 @@ mod tests { let log = slog::Logger::root(slog::Discard, o!()); // Let the OS assign a port and discover it after ClickHouse starts - let mut db = ClickHouseInstance::new(0) + let mut db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); - Client::new(address, &log).wipe_db().await.unwrap(); + let client = Client::new(address, &log); + assert!(!client.is_oximeter_cluster().await.unwrap()); + + client.wipe_single_node_db().await.unwrap(); db.cleanup().await.expect("Failed to cleanup ClickHouse server"); } + #[tokio::test] + async fn test_build_replicated() { + let log = slog::Logger::root(slog::Discard, o!()); + + // Start all Keeper coordinator nodes + let cur_dir = std::env::current_dir().unwrap(); + let keeper_config = + cur_dir.as_path().join("src/configs/keeper_config.xml"); + + // Start Keeper 1 + let k1_port = 9181; + let k1_id = 1; + + let mut k1 = ClickHouseInstance::new_keeper( + k1_port, + k1_id, + keeper_config.clone(), + ) + .await + .expect("Failed to start ClickHouse keeper 1"); + + // Start Keeper 2 + let k2_port = 9182; + let k2_id = 2; + + let mut k2 = ClickHouseInstance::new_keeper( + k2_port, + k2_id, + keeper_config.clone(), + ) + .await + .expect("Failed to start ClickHouse keeper 2"); + + // Start Keeper 3 + let k3_port = 9183; + let k3_id = 3; + + let mut k3 = + ClickHouseInstance::new_keeper(k3_port, k3_id, keeper_config) + .await + .expect("Failed to start ClickHouse keeper 3"); + + // Start all replica nodes + let cur_dir = std::env::current_dir().unwrap(); + let replica_config = + cur_dir.as_path().join("src/configs/replica_config.xml"); + + // Start Replica 1 + let r1_port = 8123; + let r1_tcp_port = 9000; + let r1_interserver_port = 9009; + let r1_name = String::from("oximeter_cluster node 1"); + let r1_number = String::from("01"); + let mut db_1 = ClickHouseInstance::new_replicated( + r1_port, + r1_tcp_port, + r1_interserver_port, + r1_name, + r1_number, + replica_config.clone(), + ) + .await + .expect("Failed to start ClickHouse node 1"); + let r1_address = + SocketAddr::new("127.0.0.1".parse().unwrap(), db_1.port()); + + // Start Replica 2 + let r2_port = 8124; + let r2_tcp_port = 9001; + let r2_interserver_port = 9010; + let r2_name = String::from("oximeter_cluster node 2"); + let r2_number = String::from("02"); + let mut db_2 = ClickHouseInstance::new_replicated( + r2_port, + r2_tcp_port, + r2_interserver_port, + r2_name, + r2_number, + replica_config, + ) + .await + .expect("Failed to start ClickHouse node 2"); + let r2_address = + SocketAddr::new("127.0.0.1".parse().unwrap(), db_2.port()); + + // Create database in node 1 + let client_1 = Client::new(r1_address, &log); + assert!(client_1.is_oximeter_cluster().await.unwrap()); + client_1 + .init_replicated_db() + .await + .expect("Failed to initialize timeseries database"); + + // Wait to make sure data has been synchronised. + // TODO(https://github.com/oxidecomputer/omicron/issues/4001): Waiting for 5 secs is a bit sloppy, + // come up with a better way to do this. + sleep(Duration::from_secs(5)).await; + + // Verify database exists in node 2 + let client_2 = Client::new(r2_address, &log); + assert!(client_2.is_oximeter_cluster().await.unwrap()); + let sql = String::from("SHOW DATABASES FORMAT JSONEachRow;"); + + let result = client_2.execute_with_body(sql).await.unwrap(); + assert!(result.contains("oximeter")); + + k1.cleanup().await.expect("Failed to cleanup ClickHouse keeper 1"); + k2.cleanup().await.expect("Failed to cleanup ClickHouse keeper 2"); + k3.cleanup().await.expect("Failed to cleanup ClickHouse keeper 3"); + db_1.cleanup().await.expect("Failed to cleanup ClickHouse server 1"); + db_2.cleanup().await.expect("Failed to cleanup ClickHouse server 2"); + } + #[tokio::test] async fn test_client_insert() { let log = slog::Logger::root(slog::Discard, o!()); // Let the OS assign a port and discover it after ClickHouse starts - let mut db = ClickHouseInstance::new(0) + let mut db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .expect("Failed to initialize timeseries database"); let samples = { @@ -681,14 +831,14 @@ mod tests { let log = slog::Logger::root(slog::Discard, o!()); // Let the OS assign a port and discover it after ClickHouse starts - let mut db = ClickHouseInstance::new(0) + let mut db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .expect("Failed to initialize timeseries database"); let sample = test_util::make_sample(); @@ -715,14 +865,14 @@ mod tests { let log = slog::Logger::root(slog::Discard, o!()); // Let the OS assign a port and discover it after ClickHouse starts - let mut db = ClickHouseInstance::new(0) + let mut db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .expect("Failed to initialize timeseries database"); let sample = test_util::make_sample(); @@ -793,14 +943,14 @@ mod tests { let log = slog::Logger::root(slog_dtrace::Dtrace::new().0, o!()); // Let the OS assign a port and discover it after ClickHouse starts - let db = ClickHouseInstance::new(0) + let db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .expect("Failed to initialize timeseries database"); @@ -991,14 +1141,14 @@ mod tests { let log = Logger::root(slog::Discard, o!()); // Let the OS assign a port and discover it after ClickHouse starts - let db = ClickHouseInstance::new(0) + let db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .expect("Failed to initialize timeseries database"); @@ -1090,14 +1240,14 @@ mod tests { test_fn: impl Fn(&Service, &[RequestLatency], &[Sample], &[Timeseries]), ) { let (target, metrics, samples) = setup_select_test(); - let mut db = ClickHouseInstance::new(0) + let mut db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); let log = Logger::root(slog::Discard, o!()); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .expect("Failed to initialize timeseries database"); client @@ -1347,14 +1497,14 @@ mod tests { #[tokio::test] async fn test_select_timeseries_with_start_time() { let (_, metrics, samples) = setup_select_test(); - let mut db = ClickHouseInstance::new(0) + let mut db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); let log = Logger::root(slog::Discard, o!()); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .expect("Failed to initialize timeseries database"); client @@ -1391,14 +1541,14 @@ mod tests { #[tokio::test] async fn test_select_timeseries_with_limit() { let (_, _, samples) = setup_select_test(); - let mut db = ClickHouseInstance::new(0) + let mut db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); let log = Logger::root(slog::Discard, o!()); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .expect("Failed to initialize timeseries database"); client @@ -1509,14 +1659,14 @@ mod tests { #[tokio::test] async fn test_select_timeseries_with_order() { let (_, _, samples) = setup_select_test(); - let mut db = ClickHouseInstance::new(0) + let mut db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); let log = Logger::root(slog::Discard, o!()); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .expect("Failed to initialize timeseries database"); client diff --git a/oximeter/db/src/configs/keeper_config.xml b/oximeter/db/src/configs/keeper_config.xml new file mode 100644 index 0000000000..19ab99f909 --- /dev/null +++ b/oximeter/db/src/configs/keeper_config.xml @@ -0,0 +1,43 @@ + + + + trace + + + 1000M + 3 + + + + + + + + + + + + + 10000 + 30000 + trace + + + + + + 9234 + + + + + 9235 + + + + + 9236 + + + + diff --git a/oximeter/db/src/configs/replica_config.xml b/oximeter/db/src/configs/replica_config.xml new file mode 100644 index 0000000000..6a2cab5862 --- /dev/null +++ b/oximeter/db/src/configs/replica_config.xml @@ -0,0 +1,487 @@ + + + + + + + + trace + true + + + 1000M + 3 + + + + + + + + + + + + + + + + + + 4096 + + + 3 + + + + + false + + + /path/to/ssl_cert_file + /path/to/ssl_key_file + + + false + + + /path/to/ssl_ca_cert_file + + + none + + + 0 + + + -1 + -1 + + + false + + + + + + + + + + none + true + true + sslv2,sslv3 + true + + + + true + true + sslv2,sslv3 + true + + + + RejectCertificateHandler + + + + + + + + + 0 + + + 100 + + + 0 + + + + 10000 + + + + + + 0.9 + + + 4194304 + + + 0 + + + + + + 8589934592 + + + 5368709120 + + + + 1000 + + + 134217728 + + + 10000 + + + + + + + + + + + + + + false + + + false + + + false + + + false + + + + + random + + + + + + + + + ::/0 + + default + default + + + + + + + 3600 + 0 + 0 + 0 + 0 + 0 + + + + + + default + + + + + + default + + + true + + + false + + + + 01 + + oximeter_cluster + + + + + + mysecretphrase + + true + + + 9000 + + + + 9001 + + + + + + + + + 9181 + + + + 9182 + + + + 9183 + + + + + + 3600 + + + 3600 + + + 60 + + + + + + system + query_log
+ toYYYYMM(event_date) + + + 7500 +
+ + + + system + trace_log
+ + toYYYYMM(event_date) + 7500 +
+ + + + system + query_thread_log
+ toYYYYMM(event_date) + 7500 +
+ + + + system + query_views_log
+ toYYYYMM(event_date) + 7500 +
+ + + + system + part_log
+ toYYYYMM(event_date) + 7500 +
+ + + + system + metric_log
+ 7500 + 1000 +
+ + + + system + asynchronous_metric_log
+ + 7000 +
+ + + + system + crash_log
+ + + 1000 +
+ + + + system + processors_profile_log
+ + toYYYYMM(event_date) + 7500 +
+ + + + + /clickhouse/task_queue/ddl + + + 604800 + + + 60 + + + 1000 + + + + + + + + hide encrypt/decrypt arguments + ((?:aes_)?(?:encrypt|decrypt)(?:_mysql)?)\s*\(\s*(?:'(?:\\'|.)+'|.*?)\s*\) + \1(???) + + + + + false + false + +
diff --git a/oximeter/db/src/db-replicated-init.sql b/oximeter/db/src/db-replicated-init.sql new file mode 100644 index 0000000000..ddd973b825 --- /dev/null +++ b/oximeter/db/src/db-replicated-init.sql @@ -0,0 +1,294 @@ +CREATE DATABASE IF NOT EXISTS oximeter ON CLUSTER oximeter_cluster; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_bool_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt8 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_bool_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_bool ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt8 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_bool_local', timeseries_name) +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_i64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Int64 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_i64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_i64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Int64 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_i64_local', timeseries_name) +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_f64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Float64 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_f64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_f64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Float64 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_f64_local', timeseries_name) +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_string_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum String +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_string_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_string ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum String +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_string_local', timeseries_name) +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_bytes_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Array(UInt8) +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_bytes_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_bytes ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Array(UInt8) +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_bytes_local', timeseries_name) +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativei64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum Int64 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_cumulativei64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativei64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum Int64 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_cumulativei64_local', timeseries_name) +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum Float64 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_cumulativef64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum Float64 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_cumulativef64_local', timeseries_name) +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Int64), + counts Array(UInt64) +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogrami64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Int64), + counts Array(UInt64) +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogrami64_local', timeseries_name) +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Float64), + counts Array(UInt64) +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogramf64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Float64), + counts Array(UInt64) +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogramf64_local', timeseries_name) +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.fields_bool ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value UInt8 +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); +-- +CREATE TABLE IF NOT EXISTS oximeter.fields_i64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value Int64 +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); +-- +CREATE TABLE IF NOT EXISTS oximeter.fields_ipaddr ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value IPv6 +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); +-- +CREATE TABLE IF NOT EXISTS oximeter.fields_string ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value String +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); +-- +CREATE TABLE IF NOT EXISTS oximeter.fields_uuid ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value UUID +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); +-- +CREATE TABLE IF NOT EXISTS oximeter.timeseries_schema ON CLUSTER oximeter_cluster +( + timeseries_name String, + fields Nested( + name String, + type Enum( + 'Bool' = 1, + 'I64' = 2, + 'IpAddr' = 3, + 'String' = 4, + 'Uuid' = 6 + ), + source Enum( + 'Target' = 1, + 'Metric' = 2 + ) + ), + datum_type Enum( + 'Bool' = 1, + 'I64' = 2, + 'F64' = 3, + 'String' = 4, + 'Bytes' = 5, + 'CumulativeI64' = 6, + 'CumulativeF64' = 7, + 'HistogramI64' = 8, + 'HistogramF64' = 9 + ), + created DateTime64(9, 'UTC') +) +ENGINE = ReplicatedMergeTree() +ORDER BY (timeseries_name, fields.name); diff --git a/oximeter/db/src/db-init.sql b/oximeter/db/src/db-single-node-init.sql similarity index 100% rename from oximeter/db/src/db-init.sql rename to oximeter/db/src/db-single-node-init.sql diff --git a/oximeter/db/src/db-wipe-replicated.sql b/oximeter/db/src/db-wipe-replicated.sql new file mode 100644 index 0000000000..1ed4d270b7 --- /dev/null +++ b/oximeter/db/src/db-wipe-replicated.sql @@ -0,0 +1 @@ +DROP DATABASE IF EXISTS oximeter ON CLUSTER oximeter_cluster; diff --git a/oximeter/db/src/db-wipe.sql b/oximeter/db/src/db-wipe-single-node.sql similarity index 100% rename from oximeter/db/src/db-wipe.sql rename to oximeter/db/src/db-wipe-single-node.sql diff --git a/package-manifest.toml b/package-manifest.toml index 532510058a..728ed80079 100644 --- a/package-manifest.toml +++ b/package-manifest.toml @@ -114,13 +114,43 @@ output.type = "zone" [package.clickhouse] service_name = "clickhouse" only_for_targets.image = "standard" +source.type = "composite" +source.packages = [ "clickhouse_svc.tar.gz", "internal-dns-cli.tar.gz" ] +output.type = "zone" + +[package.clickhouse_svc] +service_name = "clickhouse_svc" +only_for_targets.image = "standard" source.type = "local" source.paths = [ { from = "out/clickhouse", to = "/opt/oxide/clickhouse" }, { from = "smf/clickhouse/manifest.xml", to = "/var/svc/manifest/site/clickhouse/manifest.xml" }, - { from = "smf/clickhouse/method_script.sh", to = "/opt/oxide/lib/svc/manifest/clickhouse.sh" } + { from = "smf/clickhouse/method_script.sh", to = "/opt/oxide/lib/svc/manifest/clickhouse.sh" }, + { from = "smf/clickhouse/config_replica.xml", to = "/opt/oxide/clickhouse/config.d/config_replica.xml" } ] output.type = "zone" +output.intermediate_only = true +setup_hint = "Run `./tools/ci_download_clickhouse` to download the necessary binaries" + +[package.clickhouse_keeper] +service_name = "clickhouse_keeper" +only_for_targets.image = "standard" +source.type = "composite" +source.packages = [ "clickhouse_keeper_svc.tar.gz", "internal-dns-cli.tar.gz" ] +output.type = "zone" + +[package.clickhouse_keeper_svc] +service_name = "clickhouse_keeper_svc" +only_for_targets.image = "standard" +source.type = "local" +source.paths = [ + { from = "out/clickhouse", to = "/opt/oxide/clickhouse_keeper" }, + { from = "smf/clickhouse_keeper/manifest.xml", to = "/var/svc/manifest/site/clickhouse_keeper/manifest.xml" }, + { from = "smf/clickhouse_keeper/method_script.sh", to = "/opt/oxide/lib/svc/manifest/clickhouse_keeper.sh" }, + { from = "smf/clickhouse_keeper/keeper_config.xml", to = "/opt/oxide/clickhouse_keeper/keeper_config.xml" } +] +output.type = "zone" +output.intermediate_only = true setup_hint = "Run `./tools/ci_download_clickhouse` to download the necessary binaries" [package.cockroachdb] diff --git a/schema/all-zone-requests.json b/schema/all-zone-requests.json index 72f60eecac..468f00ee0c 100644 --- a/schema/all-zone-requests.json +++ b/schema/all-zone-requests.json @@ -63,6 +63,20 @@ } } }, + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "clickhouse_keeper" + ] + } + } + }, { "type": "object", "required": [ @@ -554,6 +568,24 @@ } } }, + { + "type": "object", + "required": [ + "address", + "type" + ], + "properties": { + "address": { + "type": "string" + }, + "type": { + "type": "string", + "enum": [ + "clickhouse_keeper" + ] + } + } + }, { "type": "object", "required": [ @@ -706,6 +738,7 @@ "type": "string", "enum": [ "clickhouse", + "clickhouse_keeper", "cockroach_db", "crucible_pantry", "crucible", diff --git a/schema/crdb/4.0.0/up.sql b/schema/crdb/4.0.0/up.sql new file mode 100644 index 0000000000..f87308395a --- /dev/null +++ b/schema/crdb/4.0.0/up.sql @@ -0,0 +1,37 @@ +-- CRDB documentation recommends the following: +-- "Execute schema changes either as single statements (as an implicit transaction), +-- or in an explicit transaction consisting of the single schema change statement." +-- +-- For each schema change, we transactionally: +-- 1. Check the current version +-- 2. Apply the idempotent update + +BEGIN; +SELECT CAST( + IF( + ( + SELECT version = '3.0.3' and target_version = '4.0.0' + FROM omicron.public.db_metadata WHERE singleton = true + ), + 'true', + 'Invalid starting version for schema change' + ) AS BOOL +); + +ALTER TYPE omicron.public.service_kind ADD VALUE IF NOT EXISTS 'clickhouse_keeper'; +COMMIT; + +BEGIN; +SELECT CAST( + IF( + ( + SELECT version = '3.0.3' and target_version = '4.0.0' + FROM omicron.public.db_metadata WHERE singleton = true + ), + 'true', + 'Invalid starting version for schema change' + ) AS BOOL +); + +ALTER TYPE omicron.public.dataset_kind ADD VALUE IF NOT EXISTS 'clickhouse_keeper'; +COMMIT; \ No newline at end of file diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index 0d612e36ac..4b38b7dfe4 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -188,6 +188,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS lookup_switch_by_rack ON omicron.public.switch CREATE TYPE IF NOT EXISTS omicron.public.service_kind AS ENUM ( 'clickhouse', + 'clickhouse_keeper', 'cockroach', 'crucible', 'crucible_pantry', @@ -391,6 +392,7 @@ CREATE TYPE IF NOT EXISTS omicron.public.dataset_kind AS ENUM ( 'crucible', 'cockroach', 'clickhouse', + 'clickhouse_keeper', 'external_dns', 'internal_dns' ); @@ -2560,7 +2562,7 @@ INSERT INTO omicron.public.db_metadata ( version, target_version ) VALUES - ( TRUE, NOW(), NOW(), '3.0.3', NULL) + ( TRUE, NOW(), NOW(), '4.0.0', NULL) ON CONFLICT DO NOTHING; COMMIT; diff --git a/schema/rss-service-plan.json b/schema/rss-service-plan.json index 2c6f1050da..725caf0900 100644 --- a/schema/rss-service-plan.json +++ b/schema/rss-service-plan.json @@ -63,6 +63,20 @@ } } }, + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "clickhouse_keeper" + ] + } + } + }, { "type": "object", "required": [ @@ -654,6 +668,24 @@ } } }, + { + "type": "object", + "required": [ + "address", + "type" + ], + "properties": { + "address": { + "type": "string" + }, + "type": { + "type": "string", + "enum": [ + "clickhouse_keeper" + ] + } + } + }, { "type": "object", "required": [ @@ -833,6 +865,7 @@ "type": "string", "enum": [ "clickhouse", + "clickhouse_keeper", "cockroach_db", "crucible_pantry", "crucible", diff --git a/sled-agent/src/params.rs b/sled-agent/src/params.rs index fcd6939413..5ef9594a2a 100644 --- a/sled-agent/src/params.rs +++ b/sled-agent/src/params.rs @@ -220,6 +220,7 @@ pub enum DatasetKind { CockroachDb, Crucible, Clickhouse, + ClickhouseKeeper, ExternalDns, InternalDns, } @@ -231,6 +232,7 @@ impl From for sled_agent_client::types::DatasetKind { CockroachDb => Self::CockroachDb, Crucible => Self::Crucible, Clickhouse => Self::Clickhouse, + ClickhouseKeeper => Self::ClickhouseKeeper, ExternalDns => Self::ExternalDns, InternalDns => Self::InternalDns, } @@ -244,6 +246,7 @@ impl From for nexus_client::types::DatasetKind { CockroachDb => Self::Cockroach, Crucible => Self::Crucible, Clickhouse => Self::Clickhouse, + ClickhouseKeeper => Self::ClickhouseKeeper, ExternalDns => Self::ExternalDns, InternalDns => Self::InternalDns, } @@ -257,6 +260,7 @@ impl std::fmt::Display for DatasetKind { Crucible => "crucible", CockroachDb { .. } => "cockroachdb", Clickhouse => "clickhouse", + ClickhouseKeeper => "clickhouse_keeper", ExternalDns { .. } => "external_dns", InternalDns { .. } => "internal_dns", }; @@ -356,6 +360,9 @@ pub enum ServiceType { Clickhouse { address: SocketAddrV6, }, + ClickhouseKeeper { + address: SocketAddrV6, + }, CockroachDb { address: SocketAddrV6, }, @@ -382,6 +389,9 @@ impl std::fmt::Display for ServiceType { ServiceType::Maghemite { .. } => write!(f, "mg-ddm"), ServiceType::SpSim => write!(f, "sp-sim"), ServiceType::Clickhouse { .. } => write!(f, "clickhouse"), + ServiceType::ClickhouseKeeper { .. } => { + write!(f, "clickhouse_keeper") + } ServiceType::CockroachDb { .. } => write!(f, "cockroachdb"), ServiceType::Crucible { .. } => write!(f, "crucible"), } @@ -484,6 +494,9 @@ impl TryFrom for sled_agent_client::types::ServiceType { St::Clickhouse { address } => { Ok(AutoSt::Clickhouse { address: address.to_string() }) } + St::ClickhouseKeeper { address } => { + Ok(AutoSt::ClickhouseKeeper { address: address.to_string() }) + } St::CockroachDb { address } => { Ok(AutoSt::CockroachDb { address: address.to_string() }) } @@ -508,6 +521,7 @@ impl TryFrom for sled_agent_client::types::ServiceType { #[serde(rename_all = "snake_case")] pub enum ZoneType { Clickhouse, + ClickhouseKeeper, CockroachDb, CruciblePantry, Crucible, @@ -523,6 +537,7 @@ impl From for sled_agent_client::types::ZoneType { fn from(zt: ZoneType) -> Self { match zt { ZoneType::Clickhouse => Self::Clickhouse, + ZoneType::ClickhouseKeeper => Self::ClickhouseKeeper, ZoneType::CockroachDb => Self::CockroachDb, ZoneType::Crucible => Self::Crucible, ZoneType::CruciblePantry => Self::CruciblePantry, @@ -541,6 +556,7 @@ impl std::fmt::Display for ZoneType { use ZoneType::*; let name = match self { Clickhouse => "clickhouse", + ClickhouseKeeper => "clickhouse_keeper", CockroachDb => "cockroachdb", Crucible => "crucible", CruciblePantry => "crucible_pantry", @@ -611,6 +627,7 @@ impl ServiceZoneRequest { ZoneType::Switch => None, // All other zones should be identified by their zone UUID. ZoneType::Clickhouse + | ZoneType::ClickhouseKeeper | ZoneType::CockroachDb | ZoneType::Crucible | ZoneType::ExternalDns @@ -760,6 +777,15 @@ impl ServiceZoneRequest { kind: NexusTypes::ServiceKind::Clickhouse, }); } + ServiceType::ClickhouseKeeper { address } => { + services.push(NexusTypes::ServicePutRequest { + service_id, + zone_id, + sled_id, + address: address.to_string(), + kind: NexusTypes::ServiceKind::ClickhouseKeeper, + }); + } ServiceType::Crucible { address } => { services.push(NexusTypes::ServicePutRequest { service_id, diff --git a/sled-agent/src/rack_setup/plan/service.rs b/sled-agent/src/rack_setup/plan/service.rs index 782188b2ec..2183aa7b63 100644 --- a/sled-agent/src/rack_setup/plan/service.rs +++ b/sled-agent/src/rack_setup/plan/service.rs @@ -56,7 +56,12 @@ const CRDB_COUNT: usize = 5; const OXIMETER_COUNT: usize = 1; // TODO(https://github.com/oxidecomputer/omicron/issues/732): Remove // when Nexus provisions Clickhouse. +// TODO(https://github.com/oxidecomputer/omicron/issues/4000): Set to 2 once we enable replicated ClickHouse const CLICKHOUSE_COUNT: usize = 1; +// TODO(https://github.com/oxidecomputer/omicron/issues/732): Remove +// when Nexus provisions Clickhouse keeper. +// TODO(https://github.com/oxidecomputer/omicron/issues/4000): Set to 3 once we enable replicated ClickHouse +const CLICKHOUSE_KEEPER_COUNT: usize = 0; // TODO(https://github.com/oxidecomputer/omicron/issues/732): Remove. // when Nexus provisions Crucible. const MINIMUM_U2_ZPOOL_COUNT: usize = 3; @@ -552,6 +557,46 @@ impl Plan { }); } + // Provision Clickhouse Keeper zones, continuing to stripe across sleds. + // TODO(https://github.com/oxidecomputer/omicron/issues/732): Remove + // Temporary linter rule until replicated Clickhouse is enabled + #[allow(clippy::reversed_empty_ranges)] + for _ in 0..CLICKHOUSE_KEEPER_COUNT { + let sled = { + let which_sled = + sled_allocator.next().ok_or(PlanError::NotEnoughSleds)?; + &mut sled_info[which_sled] + }; + let id = Uuid::new_v4(); + let ip = sled.addr_alloc.next().expect("Not enough addrs"); + let port = omicron_common::address::CLICKHOUSE_KEEPER_PORT; + let address = SocketAddrV6::new(ip, port, 0, 0); + let zone = dns_builder.host_zone(id, ip).unwrap(); + dns_builder + .service_backend_zone( + ServiceName::ClickhouseKeeper, + &zone, + port, + ) + .unwrap(); + let dataset_name = + sled.alloc_from_u2_zpool(DatasetKind::ClickhouseKeeper)?; + sled.request.services.push(ServiceZoneRequest { + id, + zone_type: ZoneType::ClickhouseKeeper, + addresses: vec![ip], + dataset: Some(DatasetRequest { + id, + name: dataset_name, + service_address: address, + }), + services: vec![ServiceZoneService { + id, + details: ServiceType::ClickhouseKeeper { address }, + }], + }); + } + // Provision Crucible Pantry zones, continuing to stripe across sleds. // TODO(https://github.com/oxidecomputer/omicron/issues/732): Remove for _ in 0..PANTRY_COUNT { diff --git a/sled-agent/src/services.rs b/sled-agent/src/services.rs index 68c067a457..ceda5c75b7 100644 --- a/sled-agent/src/services.rs +++ b/sled-agent/src/services.rs @@ -65,6 +65,7 @@ use itertools::Itertools; use omicron_common::address::Ipv6Subnet; use omicron_common::address::AZ_PREFIX; use omicron_common::address::BOOTSTRAP_ARTIFACT_PORT; +use omicron_common::address::CLICKHOUSE_KEEPER_PORT; use omicron_common::address::CLICKHOUSE_PORT; use omicron_common::address::COCKROACH_PORT; use omicron_common::address::CRUCIBLE_PANTRY_PORT; @@ -998,6 +999,41 @@ impl ServiceManager { Ok(()) } + async fn dns_install( + info: &SledAgentInfo, + ) -> Result { + // We want to configure the dns/install SMF service inside the + // zone with the list of DNS nameservers. This will cause + // /etc/resolv.conf to be populated inside the zone. To do + // this, we need the full list of nameservers. Fortunately, the + // nameservers provide a DNS name for the full list of + // nameservers. + // + // Note that when we configure the dns/install service, we're + // supplying values for an existing property group on the SMF + // *service*. We're not creating a new property group, nor are + // we configuring a property group on the instance. + let all_nameservers = info + .resolver + .lookup_all_ipv6(internal_dns::ServiceName::InternalDns) + .await?; + let mut dns_config_builder = PropertyGroupBuilder::new("install_props"); + for ns_addr in &all_nameservers { + dns_config_builder = dns_config_builder.add_property( + "nameserver", + "net_address", + &ns_addr.to_string(), + ); + } + Ok(ServiceBuilder::new("network/dns/install") + .add_property_group(dns_config_builder) + // We do need to enable the default instance of the + // dns/install service. It's enough to just mention it + // here, as the ServiceInstanceBuilder always enables the + // instance being added. + .add_instance(ServiceInstanceBuilder::new("default"))) + } + async fn initialize_zone( &self, request: &ZoneRequest, @@ -1082,6 +1118,9 @@ impl ServiceManager { let Some(info) = self.inner.sled_info.get() else { return Err(Error::SledAgentNotReady); }; + + let dns_service = Self::dns_install(info).await?; + let datalink = installed_zone.get_control_vnic_name(); let gateway = &info.underlay_address.to_string(); assert_eq!(request.zone.addresses.len(), 1); @@ -1094,13 +1133,15 @@ impl ServiceManager { .add_property("listen_addr", "astring", listen_addr) .add_property("listen_port", "astring", listen_port) .add_property("store", "astring", "/data"); - - let profile = ProfileBuilder::new("omicron").add_service( + let clickhouse_service = ServiceBuilder::new("oxide/clickhouse").add_instance( ServiceInstanceBuilder::new("default") .add_property_group(config), - ), - ); + ); + + let profile = ProfileBuilder::new("omicron") + .add_service(clickhouse_service) + .add_service(dns_service); profile .add_to_zone(&self.inner.log, &installed_zone) .await @@ -1109,42 +1150,51 @@ impl ServiceManager { })?; return Ok(RunningZone::boot(installed_zone).await?); } + ZoneType::ClickhouseKeeper => { + let Some(info) = self.inner.sled_info.get() else { + return Err(Error::SledAgentNotReady); + }; + + let dns_service = Self::dns_install(info).await?; + + let datalink = installed_zone.get_control_vnic_name(); + let gateway = &info.underlay_address.to_string(); + assert_eq!(request.zone.addresses.len(), 1); + let listen_addr = &request.zone.addresses[0].to_string(); + let listen_port = &CLICKHOUSE_KEEPER_PORT.to_string(); + + let config = PropertyGroupBuilder::new("config") + .add_property("datalink", "astring", datalink) + .add_property("gateway", "astring", gateway) + .add_property("listen_addr", "astring", listen_addr) + .add_property("listen_port", "astring", listen_port) + .add_property("store", "astring", "/data"); + let clickhouse_keeper_service = + ServiceBuilder::new("oxide/clickhouse_keeper") + .add_instance( + ServiceInstanceBuilder::new("default") + .add_property_group(config), + ); + let profile = ProfileBuilder::new("omicron") + .add_service(clickhouse_keeper_service) + .add_service(dns_service); + profile + .add_to_zone(&self.inner.log, &installed_zone) + .await + .map_err(|err| { + Error::io( + "Failed to setup clickhouse keeper profile", + err, + ) + })?; + return Ok(RunningZone::boot(installed_zone).await?); + } ZoneType::CockroachDb => { let Some(info) = self.inner.sled_info.get() else { return Err(Error::SledAgentNotReady); }; - // We want to configure the dns/install SMF service inside the - // zone with the list of DNS nameservers. This will cause - // /etc/resolv.conf to be populated inside the zone. To do - // this, we need the full list of nameservers. Fortunately, the - // nameservers provide a DNS name for the full list of - // nameservers. - // - // Note that when we configure the dns/install service, we're - // supplying values for an existing property group on the SMF - // *service*. We're not creating a new property group, nor are - // we configuring a property group on the instance. - let all_nameservers = info - .resolver - .lookup_all_ipv6(internal_dns::ServiceName::InternalDns) - .await?; - let mut dns_config_builder = - PropertyGroupBuilder::new("install_props"); - for ns_addr in &all_nameservers { - dns_config_builder = dns_config_builder.add_property( - "nameserver", - "net_address", - &ns_addr.to_string(), - ); - } - let dns_install = ServiceBuilder::new("network/dns/install") - .add_property_group(dns_config_builder) - // We do need to enable the default instance of the - // dns/install service. It's enough to just mention it - // here, as the ServiceInstanceBuilder always enables the - // instance being added. - .add_instance(ServiceInstanceBuilder::new("default")); + let dns_service = Self::dns_install(info).await?; // Configure the CockroachDB service. let datalink = installed_zone.get_control_vnic_name(); @@ -1171,7 +1221,7 @@ impl ServiceManager { let profile = ProfileBuilder::new("omicron") .add_service(cockroachdb_service) - .add_service(dns_install); + .add_service(dns_service); profile .add_to_zone(&self.inner.log, &installed_zone) .await @@ -1570,7 +1620,6 @@ impl ServiceManager { } ServiceType::Oximeter { address } => { info!(self.inner.log, "Setting up oximeter service"); - smfh.setprop("config/id", request.zone.id)?; smfh.setprop("config/address", address.to_string())?; smfh.refresh()?; @@ -1946,7 +1995,8 @@ impl ServiceManager { ServiceType::Crucible { .. } | ServiceType::CruciblePantry { .. } | ServiceType::CockroachDb { .. } - | ServiceType::Clickhouse { .. } => { + | ServiceType::Clickhouse { .. } + | ServiceType::ClickhouseKeeper { .. } => { panic!( "{} is a service which exists as part of a self-assembling zone", service.details, diff --git a/smf/clickhouse/config_replica.xml b/smf/clickhouse/config_replica.xml new file mode 100644 index 0000000000..180b906f64 --- /dev/null +++ b/smf/clickhouse/config_replica.xml @@ -0,0 +1,82 @@ + + + trace + true + + + 1000M + 3 + + + + + + + + + + + + + + + + + + + + + + + + + 01 + + oximeter_cluster + + + + + + mysecretphrase + + true + + + 9000 + + + + 9000 + + + + + + + + 9181 + + + + 9181 + + + + 9181 + + + + + + + + 604800 + + + 60 + + + 1000 + + diff --git a/smf/clickhouse/method_script.sh b/smf/clickhouse/method_script.sh index c278432110..3cc8c585ad 100755 --- a/smf/clickhouse/method_script.sh +++ b/smf/clickhouse/method_script.sh @@ -28,13 +28,136 @@ ipadm show-addr "$DATALINK/ll" || ipadm create-addr -t -T addrconf "$DATALINK/ll ipadm show-addr "$DATALINK/omicron6" || ipadm create-addr -t -T static -a "$LISTEN_ADDR" "$DATALINK/omicron6" route get -inet6 default -inet6 "$GATEWAY" || route add -inet6 default -inet6 "$GATEWAY" -args=( +# TEMPORARY: Racks will be set up with single node ClickHouse until +# Nexus provisions services so there is no divergence between racks +# https://github.com/oxidecomputer/omicron/issues/732 +single_node=true + +command=() +# TODO((https://github.com/oxidecomputer/omicron/issues/4000)): Remove single node mode once all racks are running in replicated mode +if $single_node +then + command+=( + "/opt/oxide/clickhouse/clickhouse" "server" "--log-file" "/var/tmp/clickhouse-server.log" "--errorlog-file" "/var/tmp/clickhouse-server.errlog" "--" "--path" "${DATASTORE}" "--listen_host" "$LISTEN_ADDR" "--http_port" "$LISTEN_PORT" -) + ) +else + # Retrieve hostnames (SRV records in internal DNS) of the clickhouse nodes. + CH_ADDRS="$(/opt/oxide/internal-dns-cli/bin/dnswait clickhouse -H)" + + if [[ -z "$CH_ADDRS" ]]; then + printf 'ERROR: found no hostnames for other ClickHouse nodes\n' >&2 + exit "$SMF_EXIT_ERR_CONFIG" + fi + + declare -a nodes=($CH_ADDRS) + + for i in "${nodes[@]}" + do + if ! grep -q "host.control-plane.oxide.internal" <<< "${i}"; then + printf 'ERROR: retrieved ClickHouse hostname does not match the expected format\n' >&2 + exit "$SMF_EXIT_ERR_CONFIG" + fi + done + + # Assign hostnames to replicas + REPLICA_HOST_01="${nodes[0]}" + REPLICA_HOST_02="${nodes[1]}" + + # Retrieve hostnames (SRV records in internal DNS) of the keeper nodes. + K_ADDRS="$(/opt/oxide/internal-dns-cli/bin/dnswait clickhouse-keeper -H)" + + if [[ -z "$K_ADDRS" ]]; then + printf 'ERROR: found no hostnames for other ClickHouse Keeper nodes\n' >&2 + exit "$SMF_EXIT_ERR_CONFIG" + fi + + declare -a keepers=($K_ADDRS) + + for i in "${keepers[@]}" + do + if ! grep -q "host.control-plane.oxide.internal" <<< "${i}"; then + printf 'ERROR: retrieved ClickHouse Keeper hostname does not match the expected format\n' >&2 + exit "$SMF_EXIT_ERR_CONFIG" + fi + done + + if [[ "${#keepers[@]}" != 3 ]] + then + printf "ERROR: expected 3 ClickHouse Keeper hosts, found "${#keepers[@]}" instead\n" >&2 + exit "$SMF_EXIT_ERR_CONFIG" + fi + + # Identify the node type this is as this will influence how the config is constructed + # TODO(https://github.com/oxidecomputer/omicron/issues/3824): There are probably much + # better ways to do this service discovery, but this works for now. + # The services contain the same IDs as the hostnames. + CLICKHOUSE_SVC="$(zonename | tr -dc [:digit:])" + REPLICA_IDENTIFIER_01="$( echo "${REPLICA_HOST_01}" | tr -dc [:digit:])" + REPLICA_IDENTIFIER_02="$( echo "${REPLICA_HOST_02}" | tr -dc [:digit:])" + if [[ $REPLICA_IDENTIFIER_01 == $CLICKHOUSE_SVC ]] + then + REPLICA_DISPLAY_NAME="oximeter_cluster node 1" + REPLICA_NUMBER="01" + elif [[ $REPLICA_IDENTIFIER_02 == $CLICKHOUSE_SVC ]] + then + REPLICA_DISPLAY_NAME="oximeter_cluster node 2" + REPLICA_NUMBER="02" + else + printf 'ERROR: service name does not match any of the identified ClickHouse hostnames\n' >&2 + exit "$SMF_EXIT_ERR_CONFIG" + fi + + # Setting environment variables this way is best practice, but has the downside of + # obscuring the field values to anyone ssh-ing into the zone. To mitigate this, + # we will be saving them to ${DATASTORE}/config_env_vars + export CH_LOG="${DATASTORE}/clickhouse-server.log" + export CH_ERROR_LOG="${DATASTORE}/clickhouse-server.errlog" + export CH_REPLICA_DISPLAY_NAME=${REPLICA_DISPLAY_NAME} + export CH_LISTEN_ADDR=${LISTEN_ADDR} + export CH_LISTEN_PORT=${LISTEN_PORT} + export CH_DATASTORE=${DATASTORE} + export CH_TMP_PATH="${DATASTORE}/tmp/" + export CH_USER_FILES_PATH="${DATASTORE}/user_files/" + export CH_USER_LOCAL_DIR="${DATASTORE}/access/" + export CH_FORMAT_SCHEMA_PATH="${DATASTORE}/format_schemas/" + export CH_REPLICA_NUMBER=${REPLICA_NUMBER} + export CH_REPLICA_HOST_01=${REPLICA_HOST_01} + export CH_REPLICA_HOST_02=${REPLICA_HOST_02} + export CH_KEEPER_HOST_01="${keepers[0]}" + export CH_KEEPER_HOST_02="${keepers[1]}" + export CH_KEEPER_HOST_03="${keepers[2]}" + + content="CH_LOG="${CH_LOG}"\n\ + CH_ERROR_LOG="${CH_ERROR_LOG}"\n\ + CH_REPLICA_DISPLAY_NAME="${CH_REPLICA_DISPLAY_NAME}"\n\ + CH_LISTEN_ADDR="${CH_LISTEN_ADDR}"\n\ + CH_LISTEN_PORT="${CH_LISTEN_PORT}"\n\ + CH_DATASTORE="${CH_DATASTORE}"\n\ + CH_TMP_PATH="${CH_TMP_PATH}"\n\ + CH_USER_FILES_PATH="${CH_USER_FILES_PATH}"\n\ + CH_USER_LOCAL_DIR="${CH_USER_LOCAL_DIR}"\n\ + CH_FORMAT_SCHEMA_PATH="${CH_FORMAT_SCHEMA_PATH}"\n\ + CH_REPLICA_NUMBER="${CH_REPLICA_NUMBER}"\n\ + CH_REPLICA_HOST_01="${CH_REPLICA_HOST_01}"\n\ + CH_REPLICA_HOST_02="${CH_REPLICA_HOST_02}"\n\ + CH_KEEPER_HOST_01="${CH_KEEPER_HOST_01}"\n\ + CH_KEEPER_HOST_02="${CH_KEEPER_HOST_02}"\n\ + CH_KEEPER_HOST_03="${CH_KEEPER_HOST_03}"" + + echo $content >> "${DATASTORE}/config_env_vars" + + + # The clickhouse binary must be run from within the directory that contains it. + # Otherwise, it does not automatically detect the configuration files, nor does + # it append them when necessary + cd /opt/oxide/clickhouse/ + command+=("./clickhouse" "server") +fi -exec /opt/oxide/clickhouse/clickhouse server "${args[@]}" & +exec "${command[@]}" & \ No newline at end of file diff --git a/smf/clickhouse_keeper/keeper_config.xml b/smf/clickhouse_keeper/keeper_config.xml new file mode 100644 index 0000000000..ec114694cc --- /dev/null +++ b/smf/clickhouse_keeper/keeper_config.xml @@ -0,0 +1,43 @@ + + + + trace + + + 1000M + 3 + + + + + + + + + + + + + 10000 + 30000 + trace + + + + + + 9234 + + + + + 9234 + + + + + 9234 + + + + diff --git a/smf/clickhouse_keeper/manifest.xml b/smf/clickhouse_keeper/manifest.xml new file mode 100644 index 0000000000..9e79cc131c --- /dev/null +++ b/smf/clickhouse_keeper/manifest.xml @@ -0,0 +1,43 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/smf/clickhouse_keeper/method_script.sh b/smf/clickhouse_keeper/method_script.sh new file mode 100755 index 0000000000..0e785f2aec --- /dev/null +++ b/smf/clickhouse_keeper/method_script.sh @@ -0,0 +1,128 @@ +#!/bin/bash + +set -x +set -o errexit +set -o pipefail + +. /lib/svc/share/smf_include.sh + +LISTEN_ADDR="$(svcprop -c -p config/listen_addr "${SMF_FMRI}")" +LISTEN_PORT="$(svcprop -c -p config/listen_port "${SMF_FMRI}")" +DATASTORE="$(svcprop -c -p config/store "${SMF_FMRI}")" +DATALINK="$(svcprop -c -p config/datalink "${SMF_FMRI}")" +GATEWAY="$(svcprop -c -p config/gateway "${SMF_FMRI}")" + +if [[ $DATALINK == unknown ]] || [[ $GATEWAY == unknown ]]; then + printf 'ERROR: missing datalink or gateway\n' >&2 + exit "$SMF_EXIT_ERR_CONFIG" +fi + +# TODO remove when https://github.com/oxidecomputer/stlouis/issues/435 is addressed +ipadm delete-if "$DATALINK" || true +ipadm create-if -t "$DATALINK" + +ipadm set-ifprop -t -p mtu=9000 -m ipv4 "$DATALINK" +ipadm set-ifprop -t -p mtu=9000 -m ipv6 "$DATALINK" + +ipadm show-addr "$DATALINK/ll" || ipadm create-addr -t -T addrconf "$DATALINK/ll" +ipadm show-addr "$DATALINK/omicron6" || ipadm create-addr -t -T static -a "$LISTEN_ADDR" "$DATALINK/omicron6" +route get -inet6 default -inet6 "$GATEWAY" || route add -inet6 default -inet6 "$GATEWAY" + +# Retrieve hostnames (SRV records in internal DNS) of all keeper nodes. +K_ADDRS="$(/opt/oxide/internal-dns-cli/bin/dnswait clickhouse-keeper -H)" + +if [[ -z "$K_ADDRS" ]]; then + printf 'ERROR: found no hostnames for other ClickHouse Keeper nodes\n' >&2 + exit "$SMF_EXIT_ERR_CONFIG" +fi + +declare -a keepers=($K_ADDRS) + +for i in "${keepers[@]}" +do + if ! grep -q "host.control-plane.oxide.internal" <<< "${i}"; then + printf 'ERROR: retrieved ClickHouse Keeper hostname does not match the expected format\n' >&2 + exit "$SMF_EXIT_ERR_CONFIG" + fi +done + +if [[ "${#keepers[@]}" != 3 ]] +then + printf "ERROR: expected 3 ClickHouse Keeper hosts, found "${#keepers[@]}" instead\n" >&2 + exit "$SMF_EXIT_ERR_CONFIG" +fi + +# Assign hostnames to replicas and keeper nodes +KEEPER_HOST_01="${keepers[0]}" +KEEPER_HOST_02="${keepers[1]}" +KEEPER_HOST_03="${keepers[2]}" + +# Generate unique reproduceable number IDs by removing letters from KEEPER_IDENTIFIER_* +# Keeper IDs must be numbers, and they cannot be reused (i.e. when a keeper node is +# unrecoverable the ID must be changed to something new). +# By trimming the hosts we can make sure all keepers will always be up to date when +# a new keeper is spun up. Clickhouse does not allow very large numbers, so we will +# be reducing to 7 characters. This should be enough entropy given the small amount +# of keepers we have. +KEEPER_ID_01="$( echo "${KEEPER_HOST_01}" | tr -dc [:digit:] | cut -c1-7)" +KEEPER_ID_02="$( echo "${KEEPER_HOST_02}" | tr -dc [:digit:] | cut -c1-7)" +KEEPER_ID_03="$( echo "${KEEPER_HOST_03}" | tr -dc [:digit:] | cut -c1-7)" + +# Identify the node type this is as this will influence how the config is constructed +# TODO(https://github.com/oxidecomputer/omicron/issues/3824): There are probably much better ways to do this service name lookup, but this works +# for now. The services contain the same IDs as the hostnames. +KEEPER_SVC="$(zonename | tr -dc [:digit:] | cut -c1-7)" +if [[ $KEEPER_ID_01 == $KEEPER_SVC ]] +then + KEEPER_ID_CURRENT=$KEEPER_ID_01 +elif [[ $KEEPER_ID_02 == $KEEPER_SVC ]] +then + KEEPER_ID_CURRENT=$KEEPER_ID_02 +elif [[ $KEEPER_ID_03 == $KEEPER_SVC ]] +then + KEEPER_ID_CURRENT=$KEEPER_ID_03 +else + printf 'ERROR: service name does not match any of the identified ClickHouse Keeper hostnames\n' >&2 + exit "$SMF_EXIT_ERR_CONFIG" +fi + +# Setting environment variables this way is best practice, but has the downside of +# obscuring the field values to anyone ssh=ing into the zone. To mitigate this, +# we will be saving them to ${DATASTORE}/config_env_vars +export CH_LOG="${DATASTORE}/clickhouse-keeper.log" +export CH_ERROR_LOG="${DATASTORE}/clickhouse-keeper.err.log" +export CH_LISTEN_ADDR=${LISTEN_ADDR} +export CH_DATASTORE=${DATASTORE} +export CH_LISTEN_PORT=${LISTEN_PORT} +export CH_KEEPER_ID_CURRENT=${KEEPER_ID_CURRENT} +export CH_LOG_STORAGE_PATH="${DATASTORE}/log" +export CH_SNAPSHOT_STORAGE_PATH="${DATASTORE}/snapshots" +export CH_KEEPER_ID_01=${KEEPER_ID_01} +export CH_KEEPER_ID_02=${KEEPER_ID_02} +export CH_KEEPER_ID_03=${KEEPER_ID_03} +export CH_KEEPER_HOST_01=${KEEPER_HOST_01} +export CH_KEEPER_HOST_02=${KEEPER_HOST_02} +export CH_KEEPER_HOST_03=${KEEPER_HOST_03} + +content="CH_LOG="${CH_LOG}"\n\ +CH_ERROR_LOG="${CH_ERROR_LOG}"\n\ +CH_LISTEN_ADDR="${CH_LISTEN_ADDR}"\n\ +CH_DATASTORE="${CH_DATASTORE}"\n\ +CH_LISTEN_PORT="${CH_LISTEN_PORT}"\n\ +CH_KEEPER_ID_CURRENT="${CH_KEEPER_ID_CURRENT}"\n\ +CH_LOG_STORAGE_PATH="${CH_LOG_STORAGE_PATH}"\n\ +CH_SNAPSHOT_STORAGE_PATH="${CH_SNAPSHOT_STORAGE_PATH}"\n\ +CH_KEEPER_ID_01="${CH_KEEPER_ID_01}"\n\ +CH_KEEPER_ID_02="${CH_KEEPER_ID_02}"\n\ +CH_KEEPER_ID_03="${CH_KEEPER_ID_03}"\n\ +CH_KEEPER_HOST_01="${CH_KEEPER_HOST_01}"\n\ +CH_KEEPER_HOST_02="${CH_KEEPER_HOST_02}"\n\ +CH_KEEPER_HOST_03="${CH_KEEPER_HOST_03}"" + +echo $content >> "${DATASTORE}/config_env_vars" + +# The clickhouse binary must be run from within the directory that contains it. +# Otherwise, it does not automatically detect the configuration files, nor does +# it append them when necessary +cd /opt/oxide/clickhouse_keeper/ +exec ./clickhouse keeper & diff --git a/smf/sled-agent/non-gimlet/config.toml b/smf/sled-agent/non-gimlet/config.toml index 63f4418eba..b4cb7e6cff 100644 --- a/smf/sled-agent/non-gimlet/config.toml +++ b/smf/sled-agent/non-gimlet/config.toml @@ -35,6 +35,10 @@ zpools = [ "oxp_f4b4dc87-ab46-49fb-a4b4-d361ae214c03", "oxp_14b4dc87-ab46-49fb-a4b4-d361ae214c03", "oxp_24b4dc87-ab46-49fb-a4b4-d361ae214c03", + "oxp_cd70d7f6-2354-4bf2-8012-55bf9eaf7930", + "oxp_ceb4461c-cf56-4719-ad3c-14430bfdfb60", + "oxp_31bd71cd-4736-4a12-a387-9b74b050396f", + "oxp_616b26df-e62a-4c68-b506-f4a923d8aaf7", ] # Percentage of usable physical DRAM to use for the VMM reservoir, which diff --git a/test-utils/src/dev/clickhouse.rs b/test-utils/src/dev/clickhouse.rs index 51c8f9965f..8e6920f0be 100644 --- a/test-utils/src/dev/clickhouse.rs +++ b/test-utils/src/dev/clickhouse.rs @@ -9,7 +9,7 @@ use std::process::Stdio; use std::time::Duration; use anyhow::Context; -use tempfile::TempDir; +use tempfile::{Builder, TempDir}; use thiserror::Error; use tokio::{ fs::File, @@ -20,9 +20,10 @@ use tokio::{ use crate::dev::poll; // Timeout used when starting up ClickHouse subprocess. -const CLICKHOUSE_TIMEOUT: Duration = Duration::from_secs(30); +// build-and-test (ubuntu-20.04) needs a little longer to get going +const CLICKHOUSE_TIMEOUT: Duration = Duration::from_secs(90); -/// A `ClickHouseInstance` is used to start and manage a ClickHouse server process. +/// A `ClickHouseInstance` is used to start and manage a ClickHouse single node server process. #[derive(Debug)] pub struct ClickHouseInstance { // Directory in which all data, logs, etc are stored. @@ -47,13 +48,16 @@ pub enum ClickHouseError { #[error("Invalid ClickHouse listening address")] InvalidAddress, + #[error("Invalid ClickHouse Keeper ID")] + InvalidKeeperId, + #[error("Failed to detect ClickHouse subprocess within timeout")] Timeout, } impl ClickHouseInstance { - /// Start a new ClickHouse server on the given IPv6 port. - pub async fn new(port: u16) -> Result { + /// Start a new single node ClickHouse server on the given IPv6 port. + pub async fn new_single_node(port: u16) -> Result { let data_dir = TempDir::new() .context("failed to create tempdir for ClickHouse data")?; let log_path = data_dir.path().join("clickhouse-server.log"); @@ -90,47 +94,8 @@ impl ClickHouseInstance { format!("failed to spawn `clickhouse` (with args: {:?})", &args) })?; - // Wait for the ClickHouse log file to become available, including the - // port number. - // - // We extract the port number from the log-file regardless of whether we - // know it already, as this is a more reliable check that the server is - // up and listening. Previously we only did this in the case we need to - // _learn_ the port, which introduces the possibility that we return - // from this function successfully, but the server itself is not yet - // ready to accept connections. let data_path = data_dir.path().to_path_buf(); - let port = poll::wait_for_condition( - || async { - let result = discover_local_listening_port( - &log_path, - CLICKHOUSE_TIMEOUT, - ) - .await; - match result { - // Successfully extracted the port, return it. - Ok(port) => Ok(port), - Err(e) => { - match e { - ClickHouseError::Io(ref inner) => { - if matches!( - inner.kind(), - std::io::ErrorKind::NotFound - ) { - return Err(poll::CondCheckError::NotYet); - } - } - _ => {} - } - Err(poll::CondCheckError::from(e)) - } - } - }, - &Duration::from_millis(500), - &CLICKHOUSE_TIMEOUT, - ) - .await - .context("waiting to discover ClickHouse port")?; + let port = wait_for_port(log_path).await?; Ok(Self { data_dir: Some(data_dir), @@ -141,6 +106,151 @@ impl ClickHouseInstance { }) } + /// Start a new replicated ClickHouse server on the given IPv6 port. + pub async fn new_replicated( + port: u16, + tcp_port: u16, + interserver_port: u16, + name: String, + r_number: String, + config_path: PathBuf, + ) -> Result { + let data_dir = TempDir::new() + .context("failed to create tempdir for ClickHouse data")?; + let log_path = data_dir.path().join("clickhouse-server.log"); + let err_log_path = data_dir.path().join("clickhouse-server.errlog"); + let tmp_path = data_dir.path().join("tmp/"); + let user_files_path = data_dir.path().join("user_files/"); + let access_path = data_dir.path().join("access/"); + let format_schemas_path = data_dir.path().join("format_schemas/"); + let args = vec![ + "server".to_string(), + "--config-file".to_string(), + format!("{}", config_path.display()), + ]; + + let child = tokio::process::Command::new("clickhouse") + .args(&args) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .env("CLICKHOUSE_WATCHDOG_ENABLE", "0") + .env("CH_LOG", &log_path) + .env("CH_ERROR_LOG", err_log_path) + .env("CH_REPLICA_DISPLAY_NAME", name) + .env("CH_LISTEN_ADDR", "::") + .env("CH_LISTEN_PORT", port.to_string()) + .env("CH_TCP_PORT", tcp_port.to_string()) + .env("CH_INTERSERVER_PORT", interserver_port.to_string()) + .env("CH_DATASTORE", data_dir.path()) + .env("CH_TMP_PATH", tmp_path) + .env("CH_USER_FILES_PATH", user_files_path) + .env("CH_USER_LOCAL_DIR", access_path) + .env("CH_FORMAT_SCHEMA_PATH", format_schemas_path) + .env("CH_REPLICA_NUMBER", r_number) + // There seems to be a bug using ipv6 with a replicated set up + // when installing all servers and coordinator nodes on the same + // server. For this reason we will be using ipv4 for testing. + .env("CH_REPLICA_HOST_01", "127.0.0.1") + .env("CH_REPLICA_HOST_02", "127.0.0.1") + .env("CH_KEEPER_HOST_01", "127.0.0.1") + .env("CH_KEEPER_HOST_02", "127.0.0.1") + .env("CH_KEEPER_HOST_03", "127.0.0.1") + .spawn() + .with_context(|| { + format!("failed to spawn `clickhouse` (with args: {:?})", &args) + })?; + + let data_path = data_dir.path().to_path_buf(); + + let result = wait_for_ready(log_path).await; + match result { + Ok(()) => Ok(Self { + data_dir: Some(data_dir), + data_path, + port, + args, + child: Some(child), + }), + Err(e) => Err(e), + } + } + + /// Start a new ClickHouse keeper on the given IPv6 port. + pub async fn new_keeper( + port: u16, + k_id: u16, + config_path: PathBuf, + ) -> Result { + // We assume that only 3 keepers will be run, and the ID of the keeper can only + // be one of "1", "2" or "3". This is to avoid having to pass the IDs of the + // other keepers as part of the function's parameters. + if ![1, 2, 3].contains(&k_id) { + return Err(ClickHouseError::InvalidKeeperId.into()); + } + // Keepers do not allow a dot in the beginning of the directory, so we must + // use a prefix. + let data_dir = Builder::new() + .prefix("k") + .tempdir() + .context("failed to create tempdir for ClickHouse Keeper data")?; + + let log_path = data_dir.path().join("clickhouse-keeper.log"); + let err_log_path = data_dir.path().join("clickhouse-keeper.err.log"); + let log_storage_path = data_dir.path().join("log"); + let snapshot_storage_path = data_dir.path().join("snapshots"); + let args = vec![ + "keeper".to_string(), + "--config-file".to_string(), + format!("{}", config_path.display()), + ]; + + let child = tokio::process::Command::new("clickhouse") + .args(&args) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .env("CLICKHOUSE_WATCHDOG_ENABLE", "0") + .env("CH_LOG", &log_path) + .env("CH_ERROR_LOG", err_log_path) + .env("CH_LISTEN_ADDR", "::") + .env("CH_LISTEN_PORT", port.to_string()) + .env("CH_KEEPER_ID_CURRENT", k_id.to_string()) + .env("CH_DATASTORE", data_dir.path()) + .env("CH_LOG_STORAGE_PATH", log_storage_path) + .env("CH_SNAPSHOT_STORAGE_PATH", snapshot_storage_path) + .env("CH_KEEPER_ID_01", "1") + .env("CH_KEEPER_ID_02", "2") + .env("CH_KEEPER_ID_03", "3") + // There seems to be a bug using ipv6 and localhost with a replicated + // set up when installing all servers and coordinator nodes on the same + // server. For this reason we will be using ipv4 for testing. + .env("CH_KEEPER_HOST_01", "127.0.0.1") + .env("CH_KEEPER_HOST_02", "127.0.0.1") + .env("CH_KEEPER_HOST_03", "127.0.0.1") + .spawn() + .with_context(|| { + format!( + "failed to spawn `clickhouse keeper` (with args: {:?})", + &args + ) + })?; + + let data_path = data_dir.path().to_path_buf(); + + let result = wait_for_ready(log_path).await; + match result { + Ok(()) => Ok(Self { + data_dir: Some(data_dir), + data_path, + port, + args, + child: Some(child), + }), + Err(e) => Err(e), + } + } + /// Wait for the ClickHouse server process to shutdown, after it's been killed. pub async fn wait_for_shutdown(&mut self) -> Result<(), anyhow::Error> { if let Some(mut child) = self.child.take() { @@ -204,6 +314,48 @@ impl Drop for ClickHouseInstance { } } +// Wait for the ClickHouse log file to become available, including the +// port number. +// +// We extract the port number from the log-file regardless of whether we +// know it already, as this is a more reliable check that the server is +// up and listening. Previously we only did this in the case we need to +// _learn_ the port, which introduces the possibility that we return +// from this function successfully, but the server itself is not yet +// ready to accept connections. +pub async fn wait_for_port(log_path: PathBuf) -> Result { + let p = poll::wait_for_condition( + || async { + let result = + discover_local_listening_port(&log_path, CLICKHOUSE_TIMEOUT) + .await; + match result { + // Successfully extracted the port, return it. + Ok(port) => Ok(port), + Err(e) => { + match e { + ClickHouseError::Io(ref inner) => { + if matches!( + inner.kind(), + std::io::ErrorKind::NotFound + ) { + return Err(poll::CondCheckError::NotYet); + } + } + _ => {} + } + Err(poll::CondCheckError::from(e)) + } + } + }, + &Duration::from_millis(500), + &CLICKHOUSE_TIMEOUT, + ) + .await + .context("waiting to discover ClickHouse port")?; + Ok(p) +} + // Parse the ClickHouse log file at the given path, looking for a line reporting the port number of // the HTTP server. This is only used if the port is chosen by the OS, not the caller. async fn discover_local_listening_port( @@ -257,10 +409,83 @@ async fn find_clickhouse_port_in_log( } } +// Wait for the ClickHouse log file to report it is ready to receive connections +pub async fn wait_for_ready(log_path: PathBuf) -> Result<(), anyhow::Error> { + let p = poll::wait_for_condition( + || async { + let result = discover_ready(&log_path, CLICKHOUSE_TIMEOUT).await; + match result { + Ok(ready) => Ok(ready), + Err(e) => { + match e { + ClickHouseError::Io(ref inner) => { + if matches!( + inner.kind(), + std::io::ErrorKind::NotFound + ) { + return Err(poll::CondCheckError::NotYet); + } + } + _ => {} + } + Err(poll::CondCheckError::from(e)) + } + } + }, + &Duration::from_millis(500), + &CLICKHOUSE_TIMEOUT, + ) + .await + .context("waiting to discover if ClickHouse is ready for connections")?; + Ok(p) +} + +// Parse the ClickHouse log file at the given path, looking for a line reporting that the server +// is ready for connections. +async fn discover_ready( + path: &Path, + timeout: Duration, +) -> Result<(), ClickHouseError> { + let timeout = Instant::now() + timeout; + tokio::time::timeout_at(timeout, clickhouse_ready_from_log(path)) + .await + .map_err(|_| ClickHouseError::Timeout)? +} + +// Parse the clickhouse log to know if the server is ready for connections. +// +// NOTE: This function loops forever until the expected line is found. It should be run under a +// timeout, or some other mechanism for cancelling it. +async fn clickhouse_ready_from_log(path: &Path) -> Result<(), ClickHouseError> { + let mut reader = BufReader::new(File::open(path).await?); + const READY: &str = " Application: Ready for connections"; + let mut lines = reader.lines(); + loop { + let line = lines.next_line().await?; + match line { + Some(line) => { + if let Some(_) = line.find(READY) { + return Ok(()); + } + } + None => { + // Reached EOF, just sleep for an interval and check again. + sleep(Duration::from_millis(10)).await; + + // We might have gotten a partial line; close the file, reopen + // it, and start reading again from the beginning. + reader = BufReader::new(File::open(path).await?); + lines = reader.lines(); + } + } + } +} + #[cfg(test)] mod tests { use super::{ - discover_local_listening_port, ClickHouseError, CLICKHOUSE_TIMEOUT, + discover_local_listening_port, discover_ready, ClickHouseError, + CLICKHOUSE_TIMEOUT, }; use std::process::Stdio; use std::{io::Write, sync::Arc, time::Duration}; @@ -302,6 +527,43 @@ mod tests { ); } + #[tokio::test] + async fn test_discover_clickhouse_ready() { + // Write some data to a fake log file + let mut file = NamedTempFile::new().unwrap(); + writeln!(file, "A garbage line").unwrap(); + writeln!( + file, + "2023.07.31 20:12:38.936192 [ 82373 ] Application: Ready for connections.", + ) + .unwrap(); + writeln!(file, "Another garbage line").unwrap(); + file.flush().unwrap(); + + assert!(matches!( + discover_ready(file.path(), CLICKHOUSE_TIMEOUT).await, + Ok(()) + )); + } + + #[tokio::test] + async fn test_discover_clickhouse_not_ready() { + // Write some data to a fake log file + let mut file = NamedTempFile::new().unwrap(); + writeln!(file, "A garbage line").unwrap(); + writeln!( + file, + "2023.07.31 20:12:38.936192 [ 82373 ] Application: Not ready for connections.", + ) + .unwrap(); + writeln!(file, "Another garbage line").unwrap(); + file.flush().unwrap(); + assert!(matches!( + discover_ready(file.path(), Duration::from_secs(1)).await, + Err(ClickHouseError::Timeout {}) + )); + } + // A regression test for #131. // // The function `discover_local_listening_port` initially read from the log file until EOF, but