From a2d5594fb2a3a61e66d2816cd9acd5bd717e9f18 Mon Sep 17 00:00:00 2001 From: Jonas Pommerening Date: Thu, 18 Mar 2021 14:17:08 +0100 Subject: [PATCH] Fix k8s tls support --- api/Cargo.lock | 20 ++++-- api/Cargo.toml | 1 + .../infrastructure/dummy_infrastructure.rs | 3 +- .../kubernetes/infrastructure.rs | 69 ++++++++++--------- api/src/main.rs | 39 ++++++----- 5 files changed, 78 insertions(+), 54 deletions(-) diff --git a/api/Cargo.lock b/api/Cargo.lock index a7e3114..4acd9ff 100644 --- a/api/Cargo.lock +++ b/api/Cargo.lock @@ -1361,7 +1361,7 @@ dependencies = [ "serde_yaml", "static_assertions", "thiserror", - "time 0.2.25", + "time 0.2.26", "tokio 1.3.0", "url 2.2.1", ] @@ -1386,9 +1386,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.88" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03b07a082330a35e43f63177cc01689da34fbffa0105e1246cf0311472cac73a" +checksum = "538c092e5586f4cdd7dd8078c4a79220e3e168880218124dcbce860f0ea938c6" [[package]] name = "libflate" @@ -1917,6 +1917,7 @@ dependencies = [ "rocket", "rocket-cache-response", "rocket_contrib", + "rustls-pemfile", "schemars", "secstr", "serde", @@ -2265,6 +2266,15 @@ dependencies = [ "semver", ] +[[package]] +name = "rustls-pemfile" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09700171bbcc799d113f2c675314d6005c3dc035f3e7307cf3e7fd459ccbe246" +dependencies = [ + "base64 0.13.0", +] + [[package]] name = "ryu" version = "1.0.5" @@ -2784,9 +2794,9 @@ dependencies = [ [[package]] name = "time" -version = "0.2.25" +version = "0.2.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1195b046942c221454c2539395f85413b33383a067449d78aab2b7b052a142f7" +checksum = "08a8cbfbf47955132d0202d1662f49b2423ae35862aee471f3ba4b133358f372" dependencies = [ "const_fn", "libc", diff --git a/api/Cargo.toml b/api/Cargo.toml index 79eadb8..272c8b7 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -36,6 +36,7 @@ reqwest = { version = "0.11", features = ["json"] } rocket = "0.4" rocket_contrib = "0.4" rocket-cache-response = "0.5" +rustls-pemfile = "0.2" secstr = { version = "0.4", features = ["serde"] } shiplift = { version = "0.7", default-features = false, features = ["unix-socket", "chrono"] } url = { version = "2.1", features = ["serde"] } diff --git a/api/src/infrastructure/dummy_infrastructure.rs b/api/src/infrastructure/dummy_infrastructure.rs index 58cd00e..23dfe55 100644 --- a/api/src/infrastructure/dummy_infrastructure.rs +++ b/api/src/infrastructure/dummy_infrastructure.rs @@ -34,7 +34,6 @@ use multimap::MultiMap; use std::collections::HashSet; use std::sync::Mutex; use std::time::Duration; -use uuid::Uuid; #[cfg(test)] pub struct DummyInfrastructure { @@ -100,7 +99,7 @@ impl Infrastructure for DummyInfrastructure { async fn deploy_services( &self, - deployment_id: &String, + _deployment_id: &String, app_name: &String, configs: &Vec, _container_config: &ContainerConfig, diff --git a/api/src/infrastructure/kubernetes/infrastructure.rs b/api/src/infrastructure/kubernetes/infrastructure.rs index c993cf5..ad5418d 100644 --- a/api/src/infrastructure/kubernetes/infrastructure.rs +++ b/api/src/infrastructure/kubernetes/infrastructure.rs @@ -39,9 +39,9 @@ use chrono::{DateTime, FixedOffset, Utc}; use failure::Error; use futures::future::join_all; use kube::{ - api::{Api, DeleteParams, ListParams, LogParams, PatchParams, PostParams, Resource}, + api::{Api, DeleteParams, ListParams, LogParams, Patch, PatchParams, PostParams, Resource}, client::Client, - config::Config, + config::{Config, Der}, error::{Error as KubeError, ErrorResponse}, }; use k8s_openapi::api::{ @@ -52,7 +52,6 @@ use k8s_openapi::api::{ core::v1::Service as V1Service, }; use multimap::MultiMap; -use reqwest::Certificate; use secstr::SecUtf8; use std::collections::BTreeMap; use std::convert::{From, TryFrom}; @@ -60,11 +59,10 @@ use std::net::IpAddr; use std::path::PathBuf; use std::str::FromStr; use url::Url; -use kube::api::Patch; pub struct KubernetesInfrastructure { cluster_endpoint: Url, - cluster_ca: Option, + cluster_ca: Option>>, cluster_token: Option, } @@ -97,7 +95,7 @@ pub enum KubernetesInfrastructureError { impl KubernetesInfrastructure { pub fn new( cluster_endpoint: Url, - cluster_ca: Option, + cluster_ca: Option>>, cluster_token: Option, ) -> Self { KubernetesInfrastructure { @@ -107,9 +105,10 @@ impl KubernetesInfrastructure { } } - fn client(&self) -> Client { + fn client(&self) -> Result { use reqwest::header::{self, HeaderValue}; let mut headers = header::HeaderMap::new(); + let mut root_cert: Option> = None; if let Some(token) = &self.cluster_token { let token_header_value = @@ -117,13 +116,21 @@ impl KubernetesInfrastructure { headers.insert(header::AUTHORIZATION, token_header_value); } + if let Some(certs) = &self.cluster_ca { + root_cert = Some(certs.into_iter() + .map(|cert| Der(cert.clone())) + .collect()); + } + let mut configuration = Config::new(self.cluster_endpoint.clone()); configuration.default_ns = String::from("default"); - // kube now wants a DER cert :( - // configuration.root_cert = self.cluster_ca; + configuration.root_cert = root_cert; configuration.headers = headers; - Client::new(configuration) + Client::try_from(configuration) + .map_err(|err| KubernetesInfrastructureError::UnexpectedError { + internal_message: format!("Failed to create client: {}", err) + }) } async fn create_service_from( @@ -149,7 +156,7 @@ impl KubernetesInfrastructure { .current_config() .map_or_else(|| "", |config| config.service_name()), )); - if let Some(pod) = Api::::namespaced(self.client(), &namespace) + if let Some(pod) = Api::::namespaced(self.client()?, &namespace) .list(&p) .await? .items @@ -198,7 +205,7 @@ impl KubernetesInfrastructure { )); let mut services = Vec::new(); - let futures = Api::::all(self.client()) + let futures = Api::::all(self.client()?) .list(&p) .await? .items @@ -232,7 +239,7 @@ impl KubernetesInfrastructure { APP_NAME_LABEL, app_name, SERVICE_NAME_LABEL, service_name )); - match Api::::all(self.client()) + match Api::::all(self.client()?) .list(&p) .await? .items @@ -258,7 +265,7 @@ impl KubernetesInfrastructure { SERVICE_NAME_LABEL, service_config.service_name(), )); - Api::::namespaced(self.client(), &app_name) + Api::::namespaced(self.client()?, &app_name) .create( &PostParams::default(), &serde_json::from_value(service_payload(app_name, service_config))?, @@ -275,7 +282,7 @@ impl KubernetesInfrastructure { ingress_route_payload(app_name, service_config).to_string().into_bytes(), )?; - self.client().request_text(request).await?; + self.client()?.request_text(request).await?; let request = Resource::dynamic("middlewares") .group("traefik.containo.us") @@ -287,7 +294,7 @@ impl KubernetesInfrastructure { middleware_payload(app_name, service_config).to_string().into_bytes(), )?; - self.client().request_text(request).await?; + self.client()?.request_text(request).await?; Ok(()) } @@ -296,7 +303,7 @@ impl KubernetesInfrastructure { &self, app_name: &String, ) -> Result<(), KubernetesInfrastructureError> { - match Api::::all(self.client()) + match Api::::all(self.client()?) .create( &PostParams::default(), &serde_json::from_value(namespace_payload(app_name))?, @@ -329,7 +336,7 @@ impl KubernetesInfrastructure { .await?; } - match Api::::namespaced(self.client(), &app_name) + match Api::::namespaced(self.client()?, &app_name) .create( &PostParams::default(), &serde_json::from_value( @@ -346,7 +353,7 @@ impl KubernetesInfrastructure { } Err(KubeError::Api(ErrorResponse { code, .. })) if code == 409 => { - Api::::namespaced(self.client(), &app_name) + Api::::namespaced(self.client()?, &app_name) .patch( &format!("{}-{}-deployment", app_name, service_config.service_name()), &PatchParams::default(), @@ -376,7 +383,7 @@ impl KubernetesInfrastructure { app_name ); - match Api::::namespaced(self.client(), &app_name) + match Api::::namespaced(self.client()?, &app_name) .create( &PostParams::default(), &serde_json::from_value( @@ -390,7 +397,7 @@ impl KubernetesInfrastructure { Ok(()) } Err(KubeError::Api(ErrorResponse { code, .. })) if code == 409 => { - Api::::namespaced(self.client(), &app_name) + Api::::namespaced(self.client()?, &app_name) .patch( &format!("{}-{}-secret", app_name, service_config.service_name()), &PatchParams::default(), @@ -413,13 +420,13 @@ impl KubernetesInfrastructure { app_name: &String, service: &'a Service, ) -> Result<&'a Service, KubernetesInfrastructureError> { - Api::::namespaced(self.client(), &service.app_name()) + Api::::namespaced(self.client()?, &service.app_name()) .delete( &format!("{}-{}-deployment", app_name, service.service_name()), &DeleteParams::default(), ) .await?; - Api::::namespaced(self.client(), &service.app_name()) + Api::::namespaced(self.client()?, &service.app_name()) .delete(service.service_name(), &DeleteParams::default()) .await?; let request = Resource::dynamic("ingressroutes") @@ -431,7 +438,7 @@ impl KubernetesInfrastructure { &format!("{}-{}-ingress-route", app_name, service.service_name()), &DeleteParams::default(), )?; - self.client().request_text(request).await?; + self.client()?.request_text(request).await?; let request = Resource::dynamic("middlewares") .group("traefik.containo.us") .version("v1alpha1") @@ -441,7 +448,7 @@ impl KubernetesInfrastructure { &format!("{}-{}-middleware", app_name, service.service_name()), &DeleteParams::default(), )?; - self.client().request_text(request).await?; + self.client()?.request_text(request).await?; Ok(service) } @@ -454,7 +461,7 @@ impl Infrastructure for KubernetesInfrastructure { p.label_selector = Some(format!("{},{}", APP_NAME_LABEL, SERVICE_NAME_LABEL)); let mut apps = MultiMap::new(); - for deployment in Api::::all(self.client()) + for deployment in Api::::all(self.client()?) .list(&p) .await? .items @@ -512,7 +519,7 @@ impl Infrastructure for KubernetesInfrastructure { stop_service_result?; } - Api::::all(self.client()) + Api::::all(self.client()?) .delete(app_name, &DeleteParams::default()) .await?; @@ -531,7 +538,7 @@ impl Infrastructure for KubernetesInfrastructure { "{}={},{}={}", APP_NAME_LABEL, app_name, SERVICE_NAME_LABEL, service_name, )); - let pod = match Api::::namespaced(self.client(), &app_name) + let pod = match Api::::namespaced(self.client()?, &app_name) .list(&p) .await? .into_iter() @@ -560,7 +567,7 @@ impl Infrastructure for KubernetesInfrastructure { }) .filter(|since_seconds| since_seconds > &0); - let logs = Api::::namespaced(self.client(), &app_name) + let logs = Api::::namespaced(self.client()?, &app_name) .logs(&pod.metadata.name.unwrap(), &p) .await?; @@ -604,7 +611,7 @@ impl Infrastructure for KubernetesInfrastructure { None => return Ok(None), }; - Api::::namespaced(self.client(), &app_name) + Api::::namespaced(self.client()?, &app_name) .patch( &format!("{}-{}-deployment", app_name, service_name), &PatchParams::default(), @@ -760,7 +767,7 @@ mod tests { use super::*; use crate::models::EnvironmentVariable; use k8s_openapi::api::apps::v1::DeploymentSpec; - use kube::api::{Object, ObjectMeta, TypeMeta}; + use kube::api::{ObjectMeta}; use std::collections::BTreeMap; macro_rules! deployment_object { diff --git a/api/src/main.rs b/api/src/main.rs index 854f407..98f9d7f 100644 --- a/api/src/main.rs +++ b/api/src/main.rs @@ -46,13 +46,12 @@ use crate::infrastructure::{Docker, Infrastructure, Kubernetes}; use crate::models::request_info::RequestInfo; use clap::{App, Arg}; use env_logger::Env; -use reqwest::Certificate; use rocket::response::NamedFile; use rocket_cache_response::CacheResponse; use secstr::SecUtf8; use serde_yaml::{from_reader, to_string, Value}; use std::fs::File; -use std::io::Read; +use std::io::{Read, BufReader}; use std::path::{Path, PathBuf}; use std::process; use std::str::FromStr; @@ -175,14 +174,14 @@ fn read_token_file(path: &PathBuf) -> Result { Ok(SecUtf8::from(token)) } -fn read_ca_file(path: &PathBuf) -> Result { +fn read_ca_file(path: &PathBuf) -> Result>, StartUpError> { debug!( "Reading certificate authority from {}.", path.to_str().unwrap() ); - let mut f = match File::open(path) { - Ok(f) => f, + let mut rd = match File::open(path) { + Ok(f) => BufReader::new(f), Err(e) => { return Err(StartUpError::CannotReadCertificateAuthority { path: String::from(path.to_str().unwrap()), @@ -191,21 +190,29 @@ fn read_ca_file(path: &PathBuf) -> Result { } }; - let mut buf = Vec::new(); - if let Err(e) = f.read_to_end(&mut buf) { - return Err(StartUpError::CannotReadCertificateAuthority { - path: String::from(path.to_str().unwrap()), - err: format!("{}", e), - }); - } - - match Certificate::from_pem(&buf) { - Ok(cert) => Ok(cert), + match rustls_pemfile::certs(&mut rd) { + Ok(certs) => Ok(certs), Err(e) => Err(StartUpError::CannotReadCertificateAuthority { path: String::from(path.to_str().unwrap()), err: format!("{}", e), - }), + }) } + + //let mut buf = Vec::new(); + //if let Err(e) = f.read_to_end(&mut buf) { + // return Err(StartUpError::CannotReadCertificateAuthority { + // path: String::from(path.to_str().unwrap()), + // err: format!("{}", e), + // }); + //} + + //match Certificate::from_pem(&buf) { + // Ok(cert) => Ok(cert), + // Err(e) => Err(StartUpError::CannotReadCertificateAuthority { + // path: String::from(path.to_str().unwrap()), + // err: format!("{}", e), + // }), + //} } fn main() -> Result<(), StartUpError> {