Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Socks5 proxy transport for RPC blockchain #493

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ compact_filters = ["rocksdb", "socks", "lazy_static", "cc"]
key-value-db = ["sled"]
all-keys = ["keys-bip39"]
keys-bip39 = ["bip39"]
rpc = ["bitcoincore-rpc"]
rpc = ["bitcoincore-rpc", "socks"]

# We currently provide mulitple implementations of `Blockchain`, all are
# blocking except for the `EsploraBlockchain` which can be either async or
Expand Down
3 changes: 3 additions & 0 deletions src/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ pub use self::rpc::RpcBlockchain;
#[cfg(feature = "rpc")]
pub use self::rpc::RpcConfig;

#[cfg(feature = "rpc")]
pub mod rpc_proxy;

#[cfg(feature = "esplora")]
#[cfg_attr(docsrs, doc(cfg(feature = "esplora")))]
pub mod esplora;
Expand Down
25 changes: 23 additions & 2 deletions src/blockchain/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
//! auth: Auth::Cookie {
//! file: "/home/user/.bitcoin/.cookie".into(),
//! },
//! proxy: None,
//! proxy_auth: None,
//! network: bdk::bitcoin::Network::Testnet,
//! wallet_name: "wallet_name".to_string(),
//! skip_blocks: None,
//! };
//! let blockchain = RpcBlockchain::from_config(&config);
//! ```

use super::rpc_proxy::ProxyTransport;
use crate::bitcoin::consensus::deserialize;
use crate::bitcoin::{Address, Network, OutPoint, Transaction, TxOut, Txid};
use crate::blockchain::{Blockchain, Capability, ConfigurableBlockchain, Progress};
Expand Down Expand Up @@ -60,7 +63,6 @@ pub struct RpcBlockchain {
capabilities: HashSet<Capability>,
/// Skip this many blocks of the blockchain at the first rescan, if None the rescan is done from the genesis block
skip_blocks: Option<u32>,

/// This is a fixed Address used as a hack key to store information on the node
_storage_address: Address,
}
Expand All @@ -72,6 +74,10 @@ pub struct RpcConfig {
pub url: String,
/// The bitcoin node authentication mechanism
pub auth: Auth,
/// A proxy (like Tor) can be used to connect Bitcoin Core RPC
pub proxy: Option<String>,
/// Authentication for proxy server
pub proxy_auth: Option<(String, String)>,
/// The network we are using (it will be checked the bitcoin node network matches this)
pub network: Network,
/// The wallet name in the bitcoin node, consider using [crate::wallet::wallet_name_from_descriptor] for this
Expand Down Expand Up @@ -358,7 +364,20 @@ impl ConfigurableBlockchain for RpcBlockchain {
let wallet_url = format!("{}/wallet/{}", config.url, &wallet_name);
debug!("connecting to {} auth:{:?}", wallet_url, config.auth);

let client = Client::new(wallet_url.as_str(), config.auth.clone().into())?;
let client = if let Some(proxy) = &config.proxy {
let proxy_transport = ProxyTransport::new(
proxy,
wallet_url.as_str(),
config.proxy_auth.clone(),
&config.auth,
)?;
Client::from_jsonrpc(bitcoincore_rpc::jsonrpc::Client::with_transport(
proxy_transport,
))
} else {
Client::new(wallet_url.as_str(), config.auth.clone().into())?
};

let loaded_wallets = client.list_wallets()?;
if loaded_wallets.contains(&wallet_name) {
debug!("wallet already loaded {:?}", wallet_name);
Expand Down Expand Up @@ -437,6 +456,8 @@ crate::bdk_blockchain_tests! {
let config = RpcConfig {
url: test_client.bitcoind.rpc_url(),
auth: Auth::Cookie { file: test_client.bitcoind.params.cookie_file.clone() },
proxy: None,
proxy_auth: None,
network: Network::Regtest,
wallet_name: format!("client-wallet-test-{:?}", std::time::SystemTime::now() ),
skip_blocks: None,
Expand Down
241 changes: 241 additions & 0 deletions src/blockchain/rpc_proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Bitcoin Dev Kit
// Written in 2021 by Rajarshi Maitra <[email protected]>
//
// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
// You may not use this file except in accordance with one or both of these
// licenses.

//! A SOCKS5 proxy transport implementation for RPC blockchain.
//!
//! This is currently internal to the lib and only compatible with
//! Bitcoin Core RPC.

use super::rpc::Auth;
use bitcoin::base64;
use bitcoincore_rpc::jsonrpc::Error as JSONRPC_Error;
use bitcoincore_rpc::jsonrpc::{Request, Response, Transport};
use bitcoincore_rpc::Error as RPC_Error;
use socks::Socks5Stream;
use std::fs::File;
use std::io::{BufRead, BufReader, Write};
use std::time::{Duration, Instant};

/// Errors that can be thrown by [`ProxyTransport`](crate::blockchain::rpc_proxy::ProxyTransport)
#[derive(Debug)]
pub enum RpcProxyError {
/// Bitcoin core rpc error
CoreRpc(bitcoincore_rpc::Error),
/// IO error
Io(std::io::Error),
/// Invalid RPC url
InvalidUrl,
/// Error serializing or deserializing JSON data
Json(serde_json::Error),
/// RPC timeout error
RpcTimeout,
/// Http Parsing Error
HttpParsing,
/// Http Timeout Error
HttpTimeout,
/// Http Response Code
HttpResponseCode(u16),
}

impl std::fmt::Display for RpcProxyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}

impl std::error::Error for RpcProxyError {}

impl_error!(bitcoincore_rpc::Error, CoreRpc, RpcProxyError);
impl_error!(std::io::Error, Io, RpcProxyError);
impl_error!(serde_json::Error, Json, RpcProxyError);

// We need to backport RpcProxyError to satisfy Transport trait bound
impl From<RpcProxyError> for JSONRPC_Error {
fn from(e: RpcProxyError) -> Self {
Self::Transport(Box::new(e))
}
}

/// SOCKS5 proxy transport
/// This is currently designed to work only with Bitcoin Core RPC
pub(crate) struct ProxyTransport {
proxy_addr: String,
target_addr: String,
proxy_credential: Option<(String, String)>,
wallet_path: String,
rpc_auth: Option<String>,
timeout: Duration,
}

impl ProxyTransport {
/// Create a new ProxyTransport
pub(crate) fn new(
proxy_addr: &str,
rpc_url: &str,
proxy_credential: Option<(String, String)>,
rpc_auth: &Auth,
) -> Result<Self, RpcProxyError> {
// Fetch the RPC address:port and wallet path from url
let (target_addr, wallet_path) = {
// the url will be of form "http://<rpc-host>:<rpc-port>/wallet/<wallet-file-name>"
(rpc_url[7..22].to_owned(), rpc_url[22..].to_owned())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally here I would parse the url entirely, it's not reliable to just index into the string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this does feel clunky. But this seemed easier for now without brain storming too much on a parser. Can you suggest a better way? Would update that in next revise.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A quick way without any extra dependencies could be:

  1. check the prefix. It should be either "http://" or nothing
  2. replace "http://" with ""
  3. splitn with n = 2, separator = "/"
  4. first part is the host, second part the wallet path

};

// fetch username password from rpc authentication
let rpc_auth = {
if let (Some(user), Some(pass)) = Self::get_user_pass(rpc_auth)? {
let mut auth = user;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not format!("{}:{}", user, pass)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because the RPC server expects the auth string to be formatted in a certain way which is base64(b"Basic {user}:{pass}"). I just broke down the steps. This can be written in shorter lines.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it just seems a bit redundant here since the steps are so simple

auth.push(':');
auth.push_str(&pass[..]);
Some(format!("Basic {}", &base64::encode(auth.as_bytes())))
} else {
None
}
};

Ok(ProxyTransport {
proxy_addr: proxy_addr.to_owned(),
target_addr,
proxy_credential,
wallet_path,
rpc_auth,
timeout: Duration::from_secs(15), // Same as regular RPC default
})
}

// Helper function to parse username:password pair for rpc Auth
fn get_user_pass(auth: &Auth) -> Result<(Option<String>, Option<String>), RpcProxyError> {
rajarshimaitra marked this conversation as resolved.
Show resolved Hide resolved
use std::io::Read;
match auth {
Auth::None => Ok((None, None)),
Auth::UserPass { username, password } => {
Ok((Some(username.clone()), Some(password.clone())))
}
Auth::Cookie { file } => {
let mut file = File::open(file)?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
let mut split = contents.splitn(2, ':');
Ok((
Some(split.next().ok_or(RPC_Error::InvalidCookieFile)?.into()),
Some(split.next().ok_or(RPC_Error::InvalidCookieFile)?.into()),
))
}
}
}

// Try to read a line from a buffered reader. If no line can be read till the deadline is reached
// return a timeout error.
fn get_line<R: BufRead>(reader: &mut R, deadline: Instant) -> Result<String, RpcProxyError> {
let mut line = String::new();
while deadline > Instant::now() {
match reader.read_line(&mut line) {
// EOF reached for now, try again later
Ok(0) => std::thread::sleep(Duration::from_millis(5)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When EOF is reached there's nothing more to wait for, you should return an error here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function is blocking, so it will internally wait for data and only return a string when there's something: https://doc.rust-lang.org/std/io/trait.BufRead.html#method.read_line

Copy link
Contributor Author

@rajarshimaitra rajarshimaitra Dec 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote it this way because I wanted to introduce a timeout into it. Socks doesn't work with timeouts. Is there a way we can do timeouts without this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, read_line() is blocking and won't return until either the fd is closed or there's a new line available.

The timeout is usually set on the TcpStream, if that's not exposed by the socks library we can't do anything.

// received useful data, return it
Ok(_) => return Ok(line),
// io error occurred, abort
Err(e) => return Err(e.into()),
}
}
Err(RpcProxyError::RpcTimeout)
}

// Http request and response over SOCKS5
fn request<R>(&self, req: impl serde::Serialize) -> Result<R, RpcProxyError>
where
R: for<'a> serde::de::Deserialize<'a>,
{
let request_deadline = Instant::now() + self.timeout;

// Open connection
let mut socks_stream = if let Some((username, password)) = &self.proxy_credential {
Socks5Stream::connect_with_password(
&self.proxy_addr[..],
&self.target_addr[..],
&username[..],
&password[..],
)?
} else {
Socks5Stream::connect(&self.proxy_addr[..], &self.target_addr[..])?
};

let socks_stream = socks_stream.get_mut();

// Serialize the body first so we can set the Content-Length header.
let body = serde_json::to_vec(&req)?;

// Send HTTP request
socks_stream.write_all(b"POST ")?;
socks_stream.write_all(self.wallet_path.as_bytes())?;
socks_stream.write_all(b" HTTP/1.1\r\n")?;
// Write headers
socks_stream.write_all(b"Content-Type: application/json-rpc\r\n")?;
socks_stream.write_all(b"Content-Length: ")?;
socks_stream.write_all(body.len().to_string().as_bytes())?;
socks_stream.write_all(b"\r\n")?;
if let Some(ref auth) = self.rpc_auth {
socks_stream.write_all(b"Authorization: ")?;
socks_stream.write_all(auth.as_ref())?;
socks_stream.write_all(b"\r\n")?;
}
// Write body
socks_stream.write_all(b"\r\n")?;
socks_stream.write_all(&body)?;
socks_stream.flush()?;

// Receive response
let mut reader = BufReader::new(socks_stream);

// Parse first HTTP response header line
let http_response = Self::get_line(&mut reader, request_deadline)?;
if http_response.len() < 12 || !http_response.starts_with("HTTP/1.1 ") {
return Err(RpcProxyError::HttpParsing);
}
let response_code = match http_response[9..12].parse::<u16>() {
Ok(n) => n,
Err(_) => return Err(RpcProxyError::HttpParsing),
};

// Skip response header fields
while Self::get_line(&mut reader, request_deadline)? != "\r\n" {}

// Even if it's != 200, we parse the response as we may get a JSONRPC error instead
// of the less meaningful HTTP error code.
let resp_body = Self::get_line(&mut reader, request_deadline)?;
match serde_json::from_str(&resp_body) {
Ok(s) => Ok(s),
Err(e) => {
if response_code != 200 {
Err(RpcProxyError::HttpResponseCode(response_code))
} else {
// If it was 200 then probably it was legitimately a parse error
Err(e.into())
}
}
}
}
}

// Make ProxyTransport usable in bitcoin_core::rpc::Client
impl Transport for ProxyTransport {
fn send_request(&self, req: Request) -> Result<Response, JSONRPC_Error> {
Ok(self.request(req)?)
}

fn send_batch(&self, reqs: &[Request]) -> Result<Vec<Response>, JSONRPC_Error> {
Ok(self.request(reqs)?)
}

fn fmt_target(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}/{}", self.target_addr, self.wallet_path)
}
}
9 changes: 7 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ pub enum Error {
Sled(sled::Error),
#[cfg(feature = "rpc")]
/// Rpc client error
Rpc(bitcoincore_rpc::Error),
RpcClient(bitcoincore_rpc::Error),
/// Rpc proxy error
#[cfg(feature = "rpc")]
RpcProxy(crate::blockchain::rpc_proxy::RpcProxyError),
#[cfg(feature = "sqlite")]
/// Rusqlite client error
Rusqlite(rusqlite::Error),
Expand Down Expand Up @@ -196,7 +199,9 @@ impl_error!(electrum_client::Error, Electrum);
#[cfg(feature = "key-value-db")]
impl_error!(sled::Error, Sled);
#[cfg(feature = "rpc")]
impl_error!(bitcoincore_rpc::Error, Rpc);
impl_error!(bitcoincore_rpc::Error, RpcClient);
#[cfg(feature = "rpc")]
impl_error!(crate::blockchain::rpc_proxy::RpcProxyError, RpcProxy);
#[cfg(feature = "sqlite")]
impl_error!(rusqlite::Error, Rusqlite);

Expand Down