Skip to content

Commit

Permalink
Implement TLS support for incoming redis connections (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Sep 7, 2021
1 parent d0dcf4d commit 39a0cba
Show file tree
Hide file tree
Showing 15 changed files with 376 additions and 46 deletions.
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ rand = { version = "0.8.4", features = ["small_rng", "std"]}
rand_distr = "0.4.1"
cached = "0.25.0"
pin-project-lite = "0.2"
tokio-openssl = "0.6.2"
openssl = { version = "0.10.36", features = ["vendored"] }

# Error handling
thiserror = "1.0"
Expand Down
6 changes: 6 additions & 0 deletions shotover-proxy/examples/redis-tls/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
version: "3.3"
services:
redis-one:
image: library/redis:5.0.9
ports:
- "1111:6379"
3 changes: 3 additions & 0 deletions shotover-proxy/examples/redis-tls/redis-cli.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/sh

redis-cli --tls --cert tls_keys/redis.crt --key tls_keys/redis.key --cacert tls_keys/ca.crt "$@"
31 changes: 31 additions & 0 deletions shotover-proxy/examples/redis-tls/tls_keys/ca.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-----BEGIN CERTIFICATE-----
MIIFSzCCAzOgAwIBAgIUWPH3dRgC/YwYZZCXib8gwJ+O8xYwDQYJKoZIhvcNAQEL
BQAwNTETMBEGA1UECgwKUmVkaXMgVGVzdDEeMBwGA1UEAwwVQ2VydGlmaWNhdGUg
QXV0aG9yaXR5MB4XDTIxMDgxNjA1NTcxN1oXDTMxMDgxNDA1NTcxN1owNTETMBEG
A1UECgwKUmVkaXMgVGVzdDEeMBwGA1UEAwwVQ2VydGlmaWNhdGUgQXV0aG9yaXR5
MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAw+X26W/dGC+2sJHiJkbf
Hq39KSWoXrCvXFP6FXuGqdEIZzaUokOSrNR0aueasOyc3hQ+B+N1DyPlPXDYxcll
yZdcbRvUT85tprheke6P1ofA369fWpm6aZIQf1ahsBpZ5KLwavqo1MC/5eqvvloy
vwomLvmjONSjnbvbofgRkFH0mETo91qh9tAK6wLSEUwZhKFsBIajP5cQ+IlL8QVi
0sIc5fIrmAPUdK2NwI1ly1L48BuBT2n46nm3J9ZX5Px+9NvsDT7VA+g5o1Pfer62
gfmEdQdWzdRBWRw6HLMuvN3w6mL/3++W5LzhF846szebzhue84XzuGSJT5i6s3LP
dgWTweJTLP7imb2JFaZDcAwDPMXUhYqhU6wst+mlsqUX9XHR9MajZluxIQLXAom/
ZUSVYjlOvtTmFAOEqe/eKxM/ZX4+umnQDIMirvtxOV8jtO02/Yog2nSs3XLNQXL5
tIwDX2A/HErzvSOsMMwKggUBChBorELaSzy1bMNoSw0t6CQ+74fT7iV7nWKKvpmf
Zfeb7WOu2XF/zjNyy9ghQkiARTupCzZY2G7kCMCtXy7Q/I3ip5LWhiywwDOgkibI
Elp0AZEy3MvLfM/7kFroWd7NFQ739o7nkU1G6hhgKIGzUqP83ozwzjHuDvrE2yXw
Pi41ze7Iy+GvlKPgkwsVR9MCAwEAAaNTMFEwHQYDVR0OBBYEFLw7OfSQI/lsTRe/
4Pu/ctJDj/a6MB8GA1UdIwQYMBaAFLw7OfSQI/lsTRe/4Pu/ctJDj/a6MA8GA1Ud
EwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggIBACwGR6DnefMO3n52bfujvwan
qRnH87IlnFrGDW1yXNAfs/9hh8UHPRNgvOzU/OlJBMWF0r+T0QwChcgqwZ/kQ5RG
GbkZeAA69Ail+LYra6p9r6tuTyZtGkpOLI+w73Qb6GS4q5agYMJjz7OU3M7m7dsw
lkYfqSxHtXc5otdxG9dlblRW5jpEoD9ry5Oh7f1FIGMgncOvYGX2zgLUr2Qn65aL
+Ung0KTmZtwPjzfWzHQEPx0hFcnh/K3xilhJ4P+8lc71aWpjnffvrONlX0dJzSzm
3xkCkorHagnAFoJwIM5/boe77PaHb4A4ehmkzKYIzMxbfGnePkUwFmGhmxgTHgtu
67voFBfIuWoEZNdl44Mb28JY7zYTJ81fi/RSnxqMg+2oxaqZlF+Y/c81fHBzOaSM
/txPmt5WWqC6aId+Mmmnmx1IpEvooc0KI3c+XPikHEWx/Z+FPD595vUaq6aiuzSs
tAvk6Z1OiArBKQcurQoHBJqfOQiu9f3+VebZ7sG3f7aRLDWpfF3JB6sHKlGl1tXY
HWBg+46ezLT3JJn820YZ2j8zZY0fGx5OwMwM64WLyIaxIiHhh+TtLaOy5+97S4LI
syet2sEZHPc3IPoBxMzeWbAt+qi74baX1U91M2/eHVPArEm3SXs0I6MFn/aQJ3IN
yAUJEnGE62qXdU0pa/LQ
-----END CERTIFICATE-----
23 changes: 23 additions & 0 deletions shotover-proxy/examples/redis-tls/tls_keys/redis.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-----BEGIN CERTIFICATE-----
MIID6DCCAdACFDSqN+WRPnbixT+8vJtDyZZnjxhaMA0GCSqGSIb3DQEBCwUAMDUx
EzARBgNVBAoMClJlZGlzIFRlc3QxHjAcBgNVBAMMFUNlcnRpZmljYXRlIEF1dGhv
cml0eTAeFw0yMTA4MTYwNTU3MTdaFw0yMjA4MTYwNTU3MTdaMCwxEzARBgNVBAoM
ClJlZGlzIFRlc3QxFTATBgNVBAMMDEdlbmVyaWMtY2VydDCCASIwDQYJKoZIhvcN
AQEBBQADggEPADCCAQoCggEBAN7ySN3uxTS3ZzPewalkuJI0vZ9JD77ImvNvkgTR
CR83dnfgf9eY7y2PHgPKEit9Gh4e633QeZAiMjvjHTM9MZCkyhusp2/yDOX8WUWx
EZQbqf7UIzoxsXi3+TeKkpuxCUi1DYfMTd4EtJqFANZ7FM9j6/2AuWCj8s2hjXIs
iUSZWiYdqS1/BkGTxRKQs5SJD8ZtgCj2Yf8IdxpW87RTQPP2yGaHdcX9IY0S2BIY
FuZ8uQ8k+fWBWuRgh2CZmS96/4s8enyo32AYgODIJb+FG+gpfuCXGhqYsyCHetJ2
6nDODoB/nMGGdGwQ65KTcwnie6M2Fuvbu8AyTH9UhURBjY8CAwEAATANBgkqhkiG
9w0BAQsFAAOCAgEAX2uLRTsYvnIWq13p0h3BwTZa2EHUC2XeTVRgMPxLqz7loUo6
ty29aDaFuLgkn0Fc5jU4kChYtSRnZXbAanmVnwtk6uOYDPFLm8MHoH61+fipXib/
IINoz4JindM8NiKQwA5xEcefnNpmQVt5TTx3H7cR5WBa0o/eCyGMyFWWWhbr5+Xg
Tye5y2uE2AQnUYLW1rb2K5VRLROvh8VN6O+T50vHLblUJ7YGhF17gDUPtO6qMtgQ
A4+yrR8JBMriweZlNULtOOsd50ubQTEGPyNCnXXw5/+g+/YG0WAgb2DccRavX4ux
sSMij9iP9pWTAosw110sYL/syS3xLTwk4TGZv1lw0gGJW85OI6TKGVP0d0Gs9NGg
e5nvOk48btMXqHfXMFO0AqkGcZd/cGNGsng06nYEnmaBkLXhTelphLZmTOmwWDTE
L9Y2/c31ltnWP1aO95QfDKQUWQN8j+VZJn5ZhFZtc3DqhUxFDX1q1GaicQGGE+4w
66bfO33XZwo9VvKAJ8SCmLWNMjyqGiWhNedfVbV73GGwlQEGuvfKipJrilcDyvII
oRRI+88gpjgzQAmhkQQVfv0Fed232JRB3moLR+H2GGBVLRUeVGJiYIGGTTiKpdkF
FMkekmq5U+olKjzNbD97oFujRf4kGjyvFv5iDUAWyA34aTvXG/ax3RfSSUc=
-----END CERTIFICATE-----
27 changes: 27 additions & 0 deletions shotover-proxy/examples/redis-tls/tls_keys/redis.key
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEA3vJI3e7FNLdnM97BqWS4kjS9n0kPvsia82+SBNEJHzd2d+B/
15jvLY8eA8oSK30aHh7rfdB5kCIyO+MdMz0xkKTKG6ynb/IM5fxZRbERlBup/tQj
OjGxeLf5N4qSm7EJSLUNh8xN3gS0moUA1nsUz2Pr/YC5YKPyzaGNciyJRJlaJh2p
LX8GQZPFEpCzlIkPxm2AKPZh/wh3GlbztFNA8/bIZod1xf0hjRLYEhgW5ny5DyT5
9YFa5GCHYJmZL3r/izx6fKjfYBiA4Mglv4Ub6Cl+4JcaGpizIId60nbqcM4OgH+c
wYZ0bBDrkpNzCeJ7ozYW69u7wDJMf1SFREGNjwIDAQABAoIBAQCd3Mb1xq0Cv96m
ntNYI4aBmDCwieZJO/hj6Rtmp7Ei1n3fMiqzuYmaI92n5zxoaMPGkjRDOQvqoBwb
xZwWfP0Mo3Ksl9tTa/vwGXgI3fFJgzEizIlJGojEptgjmM5oybl+Lx0ui96TF5fe
VdEbDbnVg7ZFIRGiOVSigAMM1jY4SF9kTl6El5wLa/dEIV9Pmq9+A65nBOKZTHbU
urJsMtUOFH98GfcnfSvNtGSe7lNZZbUIdCEz0X3DBNk54zalvHSliYVCwM1Zlp8t
LgJzeCmfpqxDplpgaAFwOyc1TeXWw8fduDlWw+RIe7Ck4d2FIk9C/ATkdzmGa9YV
D8cr2tiJAoGBAPPYp5AHipyreb5AJgCgaceNrZdDITB/XBr++Suyybz1Ir8Z+3W5
yCxgAj2qoGhMOS6CGp+KttTk7LzFInhXA+hlaICrsWiChgz1BWpI0VTvsOv09skh
i9rYCoVUQ9nB9nvztXbix2SW4vfXqS+7karQwH10wDAGqaR2yfbn0uxVAoGBAOoO
9YL9qRLXp+qhn8F2Z0Hc1flbCEj+PGBs5NmazJuQIjnluaHqJTHuejexabZcS4bl
5WvPubHH2igwCB/5ZiWNQNIF7+lVcvAAoQ4gxByLvHqRcp3CqNFgAJhCVQ1srAgX
JCEvwGK1DmQAdINtCIdmQRqOwb7xeHqSPM2d8TZTAoGAPwO2CZppT6NgirG5IGBT
9aW/Pl+yq/29p5xMd+Z0C8itegUU3o9sE0ucSKNXYJySClrE1oXaSZn/M6keB0s1
T1EleFrmNcPFMIQBKj43GmP2rINZYxCwO5Wo4lusTRG6yL1qH5brQCtd6/5nUlZ+
hk378G/DWqXeIQoxlwTBlSECgYEAx0p5bVGCtpJ9XWDE4Dtq7D+WybzjLxOaYRgX
O9l8wjBVlCqwhtcRWJSP5//d3PJ1NKVnVQcIPAHJFVLgeCko+mxtduarQmgJd6Vx
fNAVa6DnmQ1jJETs7Wnq17oTJV0UlcbucntwOhuj5y4kBwu9qVw9rtlCysxcIzGF
KCaFjhUCgYEArjavtpDghEWHzHtzk/gAV6JJMecsylE3N5aP/U0PWq3pQ/XQ9SdE
BxSRd7wEURd+OjvJfwkxJ8rO2XqfwdTAiOV0SRzCHN+Tuz9tcZxVc7tQwEVILAJw
UkIgBfGBOSEXSX3NBX3yk6Rri79Qkd7O1FDZ8y500/LEz61qPAsldJ0=
-----END RSA PRIVATE KEY-----
16 changes: 16 additions & 0 deletions shotover-proxy/examples/redis-tls/topology.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
sources:
redis_prod:
Redis:
batch_size_hint: 1
listen_addr: "127.0.0.1:6379"
tls:
certificate_authority_path: "examples/redis-tls/tls_keys/ca.crt"
certificate_path: "examples/redis-tls/tls_keys/redis.crt"
private_key_path: "examples/redis-tls/tls_keys/redis.key"
chain_config:
redis_chain:
- RedisDestination:
remote_address: "127.0.0.1:1111"
source_to_chain_mapping:
redis_prod: redis_chain
1 change: 1 addition & 0 deletions shotover-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ pub mod protocols;
pub mod runner;
pub mod server;
pub mod sources;
pub mod tls;
pub mod transforms;
85 changes: 58 additions & 27 deletions shotover-proxy/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use crate::message::Messages;
use crate::tls::TlsAcceptor;
use crate::transforms::chain::TransformChain;
use crate::transforms::Wrapper;
use anyhow::Result;
use futures::StreamExt;
use metrics::gauge;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::{mpsc, watch, Semaphore};
use tokio::time;
use tokio::time::timeout;
Expand Down Expand Up @@ -69,6 +72,8 @@ pub struct TcpCodecListener<C: Codec> {
/// Used as part of the graceful shutdown process to wait for client
/// connections to complete processing.
pub shutdown_complete_tx: mpsc::Sender<()>,

pub tls: Option<TlsAcceptor>,
}

impl<C: Codec + 'static> TcpCodecListener<C> {
Expand Down Expand Up @@ -179,6 +184,8 @@ impl<C: Codec + 'static> TcpCodecListener<C> {
// Notifies the receiver half once all clones are
// dropped.
_shutdown_complete: self.shutdown_complete_tx.clone(),

tls: self.tls.clone(),
};

// Spawn a new task to process the connections. Tokio tasks are like
Expand Down Expand Up @@ -266,6 +273,46 @@ pub struct Handler<C: Codec> {
shutdown: Shutdown,

_shutdown_complete: mpsc::Sender<()>,

tls: Option<TlsAcceptor>,
}

fn spawn_read_write_tasks<
C: Codec + 'static,
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
>(
codec: C,
rx: R,
tx: W,
in_tx: UnboundedSender<Messages>,
out_rx: UnboundedReceiver<Messages>,
) {
let mut reader = FramedRead::new(rx, codec.clone());
let writer = FramedWrite::new(tx, codec);

tokio::spawn(async move {
while let Some(message) = reader.next().await {
match message {
Ok(message) => {
if let Err(error) = in_tx.send(message) {
warn!("failed to send message: {}", error);
return;
}
}
Err(error) => {
warn!("failed to decode message: {}", error);
return;
}
}
}
});

tokio::spawn(async move {
let rx_stream = UnboundedReceiverStream::new(out_rx).map(Ok);
let r = rx_stream.forward(writer).await;
debug!("Stream ended {:?}", r);
});
}

impl<C: Codec + 'static> Handler<C> {
Expand All @@ -287,33 +334,17 @@ impl<C: Codec + 'static> Handler<C> {
// new request frame.
let mut idle_time_seconds: u64 = 1;

let (in_tx, mut in_rx) = tokio::sync::mpsc::unbounded_channel::<Messages>();
let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<Messages>();

let (rx, tx) = stream.into_split();

let mut reader = FramedRead::new(rx, self.codec.clone());
let writer = FramedWrite::new(tx, self.codec.clone());

tokio::spawn(async move {
while let Some(maybe_message) = reader.next().await {
match maybe_message {
Ok(resp_messages) => {
let _ = in_tx.send(resp_messages);
}
Err(e) => {
warn!("Frame error - {:?}", e);
break;
}
};
}
});

tokio::spawn(async move {
let rx_stream = UnboundedReceiverStream::new(out_rx).map(Ok);
let r = rx_stream.forward(writer).await;
debug!("Stream ended {:?}", r);
});
let (in_tx, mut in_rx) = mpsc::unbounded_channel::<Messages>();
let (out_tx, out_rx) = mpsc::unbounded_channel::<Messages>();

if let Some(tls) = &self.tls {
let tls_stream = tls.accept(stream).await?;
let (rx, tx) = tokio::io::split(tls_stream);
spawn_read_write_tasks(self.codec.clone(), rx, tx, in_tx, out_rx);
} else {
let (rx, tx) = stream.into_split();
spawn_read_write_tasks(self.codec.clone(), rx, tx, in_tx, out_rx);
};

while !self.shutdown.is_shutdown() {
// While reading a request frame, also listen for the shutdown signal
Expand Down
1 change: 1 addition & 0 deletions shotover-proxy/src/sources/cassandra_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl CassandraSource {
limit_connections: Arc::new(Semaphore::new(connection_limit.unwrap_or(512))),
trigger_shutdown_rx: trigger_shutdown_rx.clone(),
shutdown_complete_tx,
tls: None,
};

let join_handle = Handle::current().spawn(async move {
Expand Down
Loading

0 comments on commit 39a0cba

Please sign in to comment.