Skip to content

Commit

Permalink
Fix k8s tls support
Browse files Browse the repository at this point in the history
  • Loading branch information
jpommerening committed Mar 18, 2021
1 parent 4d9e91e commit a2d5594
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 54 deletions.
20 changes: 15 additions & 5 deletions api/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ reqwest = { version = "0.11", features = ["json"] }
rocket = "0.4"
rocket_contrib = "0.4"
rocket-cache-response = "0.5"
rustls-pemfile = "0.2"
secstr = { version = "0.4", features = ["serde"] }
shiplift = { version = "0.7", default-features = false, features = ["unix-socket", "chrono"] }
url = { version = "2.1", features = ["serde"] }
Expand Down
3 changes: 1 addition & 2 deletions api/src/infrastructure/dummy_infrastructure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use multimap::MultiMap;
use std::collections::HashSet;
use std::sync::Mutex;
use std::time::Duration;
use uuid::Uuid;

#[cfg(test)]
pub struct DummyInfrastructure {
Expand Down Expand Up @@ -100,7 +99,7 @@ impl Infrastructure for DummyInfrastructure {

async fn deploy_services(
&self,
deployment_id: &String,
_deployment_id: &String,
app_name: &String,
configs: &Vec<ServiceConfig>,
_container_config: &ContainerConfig,
Expand Down
69 changes: 38 additions & 31 deletions api/src/infrastructure/kubernetes/infrastructure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ use chrono::{DateTime, FixedOffset, Utc};
use failure::Error;
use futures::future::join_all;
use kube::{
api::{Api, DeleteParams, ListParams, LogParams, PatchParams, PostParams, Resource},
api::{Api, DeleteParams, ListParams, LogParams, Patch, PatchParams, PostParams, Resource},
client::Client,
config::Config,
config::{Config, Der},
error::{Error as KubeError, ErrorResponse},
};
use k8s_openapi::api::{
Expand All @@ -52,19 +52,17 @@ use k8s_openapi::api::{
core::v1::Service as V1Service,
};
use multimap::MultiMap;
use reqwest::Certificate;
use secstr::SecUtf8;
use std::collections::BTreeMap;
use std::convert::{From, TryFrom};
use std::net::IpAddr;
use std::path::PathBuf;
use std::str::FromStr;
use url::Url;
use kube::api::Patch;

pub struct KubernetesInfrastructure {
cluster_endpoint: Url,
cluster_ca: Option<Certificate>,
cluster_ca: Option<Vec<Vec<u8>>>,
cluster_token: Option<SecUtf8>,
}

Expand Down Expand Up @@ -97,7 +95,7 @@ pub enum KubernetesInfrastructureError {
impl KubernetesInfrastructure {
pub fn new(
cluster_endpoint: Url,
cluster_ca: Option<Certificate>,
cluster_ca: Option<Vec<Vec<u8>>>,
cluster_token: Option<SecUtf8>,
) -> Self {
KubernetesInfrastructure {
Expand All @@ -107,23 +105,32 @@ impl KubernetesInfrastructure {
}
}

fn client(&self) -> Client {
fn client(&self) -> Result<Client, KubernetesInfrastructureError> {
use reqwest::header::{self, HeaderValue};
let mut headers = header::HeaderMap::new();
let mut root_cert: Option<Vec<Der>> = None;

if let Some(token) = &self.cluster_token {
let token_header_value =
HeaderValue::from_str(&format!("Bearer {}", token.unsecure())).unwrap();
headers.insert(header::AUTHORIZATION, token_header_value);
}

if let Some(certs) = &self.cluster_ca {
root_cert = Some(certs.into_iter()
.map(|cert| Der(cert.clone()))
.collect());
}

let mut configuration = Config::new(self.cluster_endpoint.clone());
configuration.default_ns = String::from("default");
// kube now wants a DER cert :(
// configuration.root_cert = self.cluster_ca;
configuration.root_cert = root_cert;
configuration.headers = headers;

Client::new(configuration)
Client::try_from(configuration)
.map_err(|err| KubernetesInfrastructureError::UnexpectedError {
internal_message: format!("Failed to create client: {}", err)
})
}

async fn create_service_from(
Expand All @@ -149,7 +156,7 @@ impl KubernetesInfrastructure {
.current_config()
.map_or_else(|| "", |config| config.service_name()),
));
if let Some(pod) = Api::<V1Pod>::namespaced(self.client(), &namespace)
if let Some(pod) = Api::<V1Pod>::namespaced(self.client()?, &namespace)
.list(&p)
.await?
.items
Expand Down Expand Up @@ -198,7 +205,7 @@ impl KubernetesInfrastructure {
));

let mut services = Vec::new();
let futures = Api::<V1Deployment>::all(self.client())
let futures = Api::<V1Deployment>::all(self.client()?)
.list(&p)
.await?
.items
Expand Down Expand Up @@ -232,7 +239,7 @@ impl KubernetesInfrastructure {
APP_NAME_LABEL, app_name, SERVICE_NAME_LABEL, service_name
));

match Api::<V1Deployment>::all(self.client())
match Api::<V1Deployment>::all(self.client()?)
.list(&p)
.await?
.items
Expand All @@ -258,7 +265,7 @@ impl KubernetesInfrastructure {
SERVICE_NAME_LABEL,
service_config.service_name(),
));
Api::<V1Service>::namespaced(self.client(), &app_name)
Api::<V1Service>::namespaced(self.client()?, &app_name)
.create(
&PostParams::default(),
&serde_json::from_value(service_payload(app_name, service_config))?,
Expand All @@ -275,7 +282,7 @@ impl KubernetesInfrastructure {
ingress_route_payload(app_name, service_config).to_string().into_bytes(),
)?;

self.client().request_text(request).await?;
self.client()?.request_text(request).await?;

let request = Resource::dynamic("middlewares")
.group("traefik.containo.us")
Expand All @@ -287,7 +294,7 @@ impl KubernetesInfrastructure {
middleware_payload(app_name, service_config).to_string().into_bytes(),
)?;

self.client().request_text(request).await?;
self.client()?.request_text(request).await?;

Ok(())
}
Expand All @@ -296,7 +303,7 @@ impl KubernetesInfrastructure {
&self,
app_name: &String,
) -> Result<(), KubernetesInfrastructureError> {
match Api::<V1Namespace>::all(self.client())
match Api::<V1Namespace>::all(self.client()?)
.create(
&PostParams::default(),
&serde_json::from_value(namespace_payload(app_name))?,
Expand Down Expand Up @@ -329,7 +336,7 @@ impl KubernetesInfrastructure {
.await?;
}

match Api::<V1Deployment>::namespaced(self.client(), &app_name)
match Api::<V1Deployment>::namespaced(self.client()?, &app_name)
.create(
&PostParams::default(),
&serde_json::from_value(
Expand All @@ -346,7 +353,7 @@ impl KubernetesInfrastructure {
}

Err(KubeError::Api(ErrorResponse { code, .. })) if code == 409 => {
Api::<V1Deployment>::namespaced(self.client(), &app_name)
Api::<V1Deployment>::namespaced(self.client()?, &app_name)
.patch(
&format!("{}-{}-deployment", app_name, service_config.service_name()),
&PatchParams::default(),
Expand Down Expand Up @@ -376,7 +383,7 @@ impl KubernetesInfrastructure {
app_name
);

match Api::<V1Secret>::namespaced(self.client(), &app_name)
match Api::<V1Secret>::namespaced(self.client()?, &app_name)
.create(
&PostParams::default(),
&serde_json::from_value(
Expand All @@ -390,7 +397,7 @@ impl KubernetesInfrastructure {
Ok(())
}
Err(KubeError::Api(ErrorResponse { code, .. })) if code == 409 => {
Api::<V1Secret>::namespaced(self.client(), &app_name)
Api::<V1Secret>::namespaced(self.client()?, &app_name)
.patch(
&format!("{}-{}-secret", app_name, service_config.service_name()),
&PatchParams::default(),
Expand All @@ -413,13 +420,13 @@ impl KubernetesInfrastructure {
app_name: &String,
service: &'a Service,
) -> Result<&'a Service, KubernetesInfrastructureError> {
Api::<V1Deployment>::namespaced(self.client(), &service.app_name())
Api::<V1Deployment>::namespaced(self.client()?, &service.app_name())
.delete(
&format!("{}-{}-deployment", app_name, service.service_name()),
&DeleteParams::default(),
)
.await?;
Api::<V1Service>::namespaced(self.client(), &service.app_name())
Api::<V1Service>::namespaced(self.client()?, &service.app_name())
.delete(service.service_name(), &DeleteParams::default())
.await?;
let request = Resource::dynamic("ingressroutes")
Expand All @@ -431,7 +438,7 @@ impl KubernetesInfrastructure {
&format!("{}-{}-ingress-route", app_name, service.service_name()),
&DeleteParams::default(),
)?;
self.client().request_text(request).await?;
self.client()?.request_text(request).await?;
let request = Resource::dynamic("middlewares")
.group("traefik.containo.us")
.version("v1alpha1")
Expand All @@ -441,7 +448,7 @@ impl KubernetesInfrastructure {
&format!("{}-{}-middleware", app_name, service.service_name()),
&DeleteParams::default(),
)?;
self.client().request_text(request).await?;
self.client()?.request_text(request).await?;

Ok(service)
}
Expand All @@ -454,7 +461,7 @@ impl Infrastructure for KubernetesInfrastructure {
p.label_selector = Some(format!("{},{}", APP_NAME_LABEL, SERVICE_NAME_LABEL));

let mut apps = MultiMap::new();
for deployment in Api::<V1Deployment>::all(self.client())
for deployment in Api::<V1Deployment>::all(self.client()?)
.list(&p)
.await?
.items
Expand Down Expand Up @@ -512,7 +519,7 @@ impl Infrastructure for KubernetesInfrastructure {
stop_service_result?;
}

Api::<V1Namespace>::all(self.client())
Api::<V1Namespace>::all(self.client()?)
.delete(app_name, &DeleteParams::default())
.await?;

Expand All @@ -531,7 +538,7 @@ impl Infrastructure for KubernetesInfrastructure {
"{}={},{}={}",
APP_NAME_LABEL, app_name, SERVICE_NAME_LABEL, service_name,
));
let pod = match Api::<V1Pod>::namespaced(self.client(), &app_name)
let pod = match Api::<V1Pod>::namespaced(self.client()?, &app_name)
.list(&p)
.await?
.into_iter()
Expand Down Expand Up @@ -560,7 +567,7 @@ impl Infrastructure for KubernetesInfrastructure {
})
.filter(|since_seconds| since_seconds > &0);

let logs = Api::<V1Pod>::namespaced(self.client(), &app_name)
let logs = Api::<V1Pod>::namespaced(self.client()?, &app_name)
.logs(&pod.metadata.name.unwrap(), &p)
.await?;

Expand Down Expand Up @@ -604,7 +611,7 @@ impl Infrastructure for KubernetesInfrastructure {
None => return Ok(None),
};

Api::<V1Deployment>::namespaced(self.client(), &app_name)
Api::<V1Deployment>::namespaced(self.client()?, &app_name)
.patch(
&format!("{}-{}-deployment", app_name, service_name),
&PatchParams::default(),
Expand Down Expand Up @@ -760,7 +767,7 @@ mod tests {
use super::*;
use crate::models::EnvironmentVariable;
use k8s_openapi::api::apps::v1::DeploymentSpec;
use kube::api::{Object, ObjectMeta, TypeMeta};
use kube::api::{ObjectMeta};
use std::collections::BTreeMap;

macro_rules! deployment_object {
Expand Down
Loading

0 comments on commit a2d5594

Please sign in to comment.