Skip to content

Commit

Permalink
wip: portforward
Browse files Browse the repository at this point in the history
  • Loading branch information
ElaBosak233 committed Nov 6, 2024
1 parent 53ea6ae commit 2d08207
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 33 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ description = "The CdsCTF project is an open-source, high-performance, Jeopardy-
# Async
async-trait = { version = "0.1" }
tokio = { version = "1.41", features = ["full"] }
tokio-util = { version = "0.7.12" }
futures = { version = "^0.3" }
futures-util = { version = "^0.3" }
tower = { version = "0.5" }
Expand Down Expand Up @@ -90,7 +91,7 @@ fred = { version = "9.3", features = [
] }

# Containerization & Orchestration
kube = { version = "0.96", features = ["runtime", "derive"] }
kube = { version = "0.96", features = ["runtime", "derive", "rustls-tls", "ws"] }
k8s-openapi = { version = "0.23", features = ["latest"] }

# Miscellaneous
Expand Down
33 changes: 26 additions & 7 deletions src/cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{collections::BTreeMap, process, sync::OnceLock, time::Duration};

use axum::extract::ws::WebSocket;
use k8s_openapi::api::core::v1::{
Container as K8sContainer, ContainerPort, EnvVar, Pod, PodSpec, Service, ServicePort,
ServiceSpec,
Expand All @@ -11,6 +11,7 @@ use kube::{
Client as K8sClient, Config,
};
use tracing::{error, info};
use tokio_util::codec::Framed;

use crate::config;

Expand Down Expand Up @@ -48,7 +49,7 @@ pub async fn create(
injected_flag: crate::model::challenge::Flag,
) -> Result<Vec<crate::model::pod::Nat>, anyhow::Error> {
let client = get_k8s_client().clone();
let pods: Api<Pod> = Api::namespaced(
let api: Api<Pod> = Api::namespaced(
client.clone(),
config::get_config().cluster.namespace.as_str(),
);
Expand Down Expand Up @@ -93,17 +94,20 @@ pub async fn create(
name: name.clone(),
image: challenge.image_name.clone(),
env: Some(env_vars),
ports: Some(container_ports),
ports: Some(match config::get_config().cluster.proxy.enabled {
true => vec![],
false => container_ports,
}),
..Default::default()
}],
..Default::default()
}),
..Default::default()
};

pods.create(&PostParams::default(), &pod).await?;
api.create(&PostParams::default(), &pod).await?;

kube::runtime::wait::await_condition(pods.clone(), &name, conditions::is_pod_running()).await?;
kube::runtime::wait::await_condition(api.clone(), &name, conditions::is_pod_running()).await?;

// let pod = pods.get(&name).await?;

Expand Down Expand Up @@ -172,9 +176,24 @@ pub async fn create(
}

pub async fn delete(name: String) {
let pods: Api<Pod> = Api::namespaced(
let api: Api<Pod> = Api::namespaced(
get_k8s_client().clone(),
config::get_config().cluster.namespace.as_str(),
);
let _ = pods.delete(&name, &DeleteParams::default()).await;
let _ = api.delete(&name, &DeleteParams::default()).await;
}

pub async fn wsrx(name: String, port: u16, ws: WebSocket) -> Result<(), anyhow::Error> {
let api: Api<Pod> = Api::namespaced(
get_k8s_client().clone(),
config::get_config().cluster.namespace.as_str(),
);
let mut pf = api.portforward(&name, &[port]).await?;
let pfw = pf.take_stream(port);
if let Some(pfw) = pfw {
let stream = Framed::new(pfw, wsrx::proxy::MessageCodec::new());
let ws: wsrx::WrappedWsStream = ws.into();
wsrx::proxy::proxy_stream(stream, ws).await?;
}
return Ok(());
}
31 changes: 6 additions & 25 deletions src/web/router/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use axum::{
};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use serde::Deserialize;

use tracing::debug;
use crate::{config, database::get_db, web::traits::WebError};

pub fn router() -> Router {
Expand All @@ -25,31 +25,12 @@ pub async fn link(
}

let ws = ws.unwrap();

let pod = crate::model::pod::Entity::find()
.filter(crate::model::pod::Column::Name.eq(token))
.one(&get_db())
.await
.unwrap();

if pod.is_none() {
return Err(WebError::NotFound(String::from("")));
}

let pod = pod.unwrap();

let target_nat = pod.nats.iter().find(|p| p.src == query.port.to_string());

if target_nat.is_none() {
return Err(WebError::NotFound(String::from("")));
}

let target_nat = target_nat.unwrap();
let target_port = target_nat.dst.clone().unwrap();
let target_url = format!("{}:{}", config::get_config().cluster.entry, target_port);
let port = query.port;

return Ok(ws.on_upgrade(move |socket| async move {
let tcp = tokio::net::TcpStream::connect(target_url).await.unwrap();
let _ = wsrx::proxy(socket.into(), tcp).await;
let result = crate::cluster::wsrx(token, port as u16, socket).await;
if let Err(e) = result {
debug!("Failed to link pods: {:?}", e);
}
}));
}

0 comments on commit 2d08207

Please sign in to comment.