Skip to content

Commit

Permalink
feat(rust): improvements to portals commands arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
adrianbenavides committed Nov 12, 2024
1 parent f81633e commit 21a6fbe
Show file tree
Hide file tree
Showing 26 changed files with 556 additions and 229 deletions.
266 changes: 219 additions & 47 deletions implementations/rust/ockam/ockam_command/src/influxdb/inlet/create.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,145 @@
use crate::node::util::initialize_default_node;
use crate::tcp::inlet::create::CreateCommand as InletCreateCommand;
use crate::shared_args::OptionalTimeoutArg;
use crate::tcp::inlet::create::{tcp_inlet_default_from_addr, tcp_inlet_default_to_addr};
use crate::tcp::util::alias_parser;
use crate::util::parsers::duration_parser;
use crate::util::parsers::hostname_parser;
use crate::util::{port_is_free_guard, print_warning_for_deprecated_flag_replaced};
use crate::{Command, CommandGlobalOpts};
use async_trait::async_trait;
use clap::builder::FalseyValueParser;
use clap::Args;
use colorful::Colorful;
use miette::miette;
use miette::{miette, IntoDiagnostic};
use ockam::identity::Identifier;
use ockam::transport::HostnamePort;
use ockam::Context;
use ockam_abac::PolicyExpression;
use ockam_api::address::extract_address_value;
use ockam_api::cli_state::random_name;
use ockam_api::colors::color_primary;
use ockam_api::influxdb::{InfluxDBPortals, LeaseUsage};
use ockam_api::nodes::models::portal::InletStatus;
use ockam_api::nodes::BackgroundNodeClient;
use ockam_api::{fmt_info, fmt_log, fmt_ok, fmt_warn, ConnectionStatus};
use ockam_api::{fmt_info, fmt_log, fmt_ok, fmt_warn, CliState, ConnectionStatus};
use ockam_core::api::{Reply, Status};
use ockam_multiaddr::MultiAddr;
use ockam_multiaddr::{proto, MultiAddr, Protocol};
use ockam_node::compat::asynchronous::resolve_peer;
use std::str::FromStr;
use std::time::Duration;
use tracing::trace;

/// Create InfluxDB Inlets
#[derive(Clone, Debug, Args)]
pub struct InfluxDBCreateCommand {
pub struct CreateCommand {
/// Assign a name to this InfluxDB Inlet
#[arg(id = "NAME", value_parser = alias_parser)]
pub name: Option<String>,

/// Node on which to start the InfluxDB Inlet.
#[arg(long, display_order = 900, id = "NODE_NAME", value_parser = extract_address_value)]
pub at: Option<String>,

/// Address on which to accept InfluxDB connections.
#[arg(long, display_order = 900, id = "SOCKET_ADDRESS", hide_default_value = true, default_value_t = tcp_inlet_default_from_addr(), value_parser = hostname_parser)]
pub from: HostnamePort,

/// Route to a InfluxDB Outlet or the name of the InfluxDB Outlet service you want to connect to.
///
/// If you are connecting to a local node, you can provide the route as `/node/n/service/outlet`.
///
/// If you are connecting to a remote node through a relay in the Orchestrator you can either
/// provide the full route to the InfluxDB Outlet as `/project/myproject/service/forward_to_myrelay/secure/api/service/outlet`,
/// or just the name of the service as `outlet` or `/service/outlet`.
/// If you are passing just the service name, consider using `--via` to specify the
/// relay name (e.g. `ockam tcp-inlet create --to outlet --via myrelay`).
#[arg(long, display_order = 900, id = "ROUTE", default_value_t = tcp_inlet_default_to_addr())]
pub to: String,

/// Name of the relay that this InfluxDB Inlet will use to connect to the InfluxDB Outlet.
///
/// Use this flag when you are using `--to` to specify the service name of a InfluxDB Outlet
/// that is reachable through a relay in the Orchestrator.
/// If you don't provide it, the default relay name will be used, if necessary.
#[arg(long, display_order = 900, id = "RELAY_NAME")]
pub via: Option<String>,

/// Identity to be used to create the secure channel. If not set, the node's identity will be used.
#[arg(long, value_name = "IDENTITY_NAME", display_order = 900)]
pub identity: Option<String>,

/// Authorized identifier for secure channel connection
#[arg(long, name = "AUTHORIZED", display_order = 900)]
pub authorized: Option<Identifier>,

/// [DEPRECATED] Use the <NAME> positional argument instead
#[arg(long, display_order = 900, id = "ALIAS", value_parser = alias_parser)]
pub alias: Option<String>,

/// Policy expression that will be used for access control to the InfluxDB Inlet.
/// If you don't provide it, the policy set for the "tcp-inlet" resource type will be used.
///
/// You can check the fallback policy with `ockam policy show --resource-type tcp-inlet`.
#[arg(
long,
visible_alias = "expression",
display_order = 900,
id = "POLICY_EXPRESSION"
)]
pub allow: Option<PolicyExpression>,

/// Time to wait for the outlet to be available.
#[arg(long, display_order = 900, id = "WAIT", default_value = "5s", value_parser = duration_parser)]
pub connection_wait: Duration,

/// Time to wait before retrying to connect to the InfluxDB Outlet.
#[arg(long, display_order = 900, id = "RETRY", default_value = "20s", value_parser = duration_parser)]
pub retry_wait: Duration,

#[command(flatten)]
pub tcp_inlet: InletCreateCommand,
pub timeout: OptionalTimeoutArg,

/// Create the InfluxDB Inlet without waiting for the InfluxDB Outlet to connect
#[arg(long, default_value = "false")]
pub no_connection_wait: bool,

/// Enable UDP NAT puncture.
#[arg(
long,
visible_alias = "enable-udp-puncture",
value_name = "BOOL",
default_value_t = false,
hide = true
)]
pub udp: bool,

/// Disable fallback to TCP.
/// TCP won't be used to transfer data between the Inlet and the Outlet.
#[arg(
long,
visible_alias = "disable-tcp-fallback",
value_name = "BOOL",
default_value_t = false,
hide = true
)]
pub no_tcp_fallback: bool,

/// Use eBPF and RawSocket to access TCP packets instead of TCP data stream.
/// If `OCKAM_PRIVILEGED` env variable is set to 1, this argument will be `true`.
#[arg(long, env = "OCKAM_PRIVILEGED", value_parser = FalseyValueParser::default(), hide = true)]
pub privileged: bool,

#[arg(long, value_name = "BOOL", default_value_t = false, hide = true)]
/// Enable TLS for the InfluxDB Inlet.
/// Uses the default project TLS certificate provider, `/project/default/service/tls_certificate_provider`.
/// To specify a different certificate provider, use `--tls-certificate-provider`.
/// Requires `ockam-tls-certificate` credential attribute.
pub tls: bool,

#[arg(long, value_name = "ROUTE", hide = true)]
/// Enable TLS for the InfluxDB Inlet using the provided certificate provider.
/// Requires `ockam-tls-certificate` credential attribute.
pub tls_certificate_provider: Option<MultiAddr>,

/// Share the leases among the clients or use a separate lease for each client
#[arg(long, default_value = "per-client")]
Expand All @@ -33,48 +153,44 @@ pub struct InfluxDBCreateCommand {
}

#[async_trait]
impl Command for InfluxDBCreateCommand {
impl Command for CreateCommand {
const NAME: &'static str = "influxdb-inlet create";

async fn async_run(mut self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> {
initialize_default_node(ctx, &opts).await?;
self = self.parse_args(&opts).await?;
let cmd = self.parse_args(&opts).await?;

let mut node = BackgroundNodeClient::create(ctx, &opts.state, &self.tcp_inlet.at).await?;
self.tcp_inlet
.timeout
.timeout
.map(|t| node.set_timeout_mut(t));
let mut node = BackgroundNodeClient::create(ctx, &opts.state, &cmd.at).await?;
cmd.timeout.timeout.map(|t| node.set_timeout_mut(t));

let inlet_status = {
let pb = opts.terminal.progress_bar();
if let Some(pb) = pb.as_ref() {
pb.set_message(format!(
"Creating a InfluxDB Inlet at {}...\n",
color_primary(&self.tcp_inlet.from)
color_primary(&cmd.from)
));
}

loop {
let result: Reply<InletStatus> = node
.create_influxdb_inlet(
ctx,
&self.tcp_inlet.from,
&self.tcp_inlet.to(),
self.tcp_inlet.alias.as_ref().expect("The `alias` argument should be set to its default value if not provided"),
&self.tcp_inlet.authorized,
&self.tcp_inlet.allow,
self.tcp_inlet.connection_wait,
!self.tcp_inlet.no_connection_wait,
&self
.tcp_inlet
&cmd.from,
&cmd.to(),
cmd.name.as_ref().expect("The `name` argument should be set to its default value if not provided"),
&cmd.authorized,
&cmd.allow,
cmd.connection_wait,
!cmd.no_connection_wait,
&cmd
.secure_channel_identifier(&opts.state)
.await?,
self.tcp_inlet.udp,
self.tcp_inlet.no_tcp_fallback,
&self.tcp_inlet.tls_certificate_provider,
self.leased_token_strategy.clone(),
self.lease_manager_route.clone(),
cmd.udp,
cmd.no_tcp_fallback,
&cmd.tls_certificate_provider,
cmd.leased_token_strategy.clone(),
cmd.lease_manager_route.clone(),
)
.await?;

Expand All @@ -90,49 +206,46 @@ impl Command for InfluxDBCreateCommand {
};
trace!("the inlet creation returned a non-OK status: {s:?}");

if self.tcp_inlet.retry_wait.as_millis() == 0 {
return Err(miette!("Failed to create TCP inlet"))?;
if cmd.retry_wait.as_millis() == 0 {
return Err(miette!("Failed to create InfluxDB inlet"))?;
}

if let Some(pb) = pb.as_ref() {
pb.set_message(format!(
"Waiting for TCP Inlet {} to be available... Retrying momentarily\n",
color_primary(&self.tcp_inlet.to)
"Waiting for InfluxDB Inlet {} to be available... Retrying momentarily\n",
color_primary(&cmd.to)
));
}
tokio::time::sleep(self.tcp_inlet.retry_wait).await
tokio::time::sleep(cmd.retry_wait).await
}
}
}
};

let node_name = node.node_name();
self.tcp_inlet
.add_inlet_created_event(&opts, &node_name, &inlet_status)
.await?;

let created_message = fmt_ok!(
"Created a new InfluxDB Inlet in the Node {} bound to {}\n",
color_primary(&node_name),
color_primary(&self.tcp_inlet.from)
color_primary(&cmd.from)
);

let plain = if self.tcp_inlet.no_connection_wait {
created_message + &fmt_log!("It will automatically connect to the TCP Outlet at {} as soon as it is available",
color_primary(&self.tcp_inlet.to)
let plain = if cmd.no_connection_wait {
created_message + &fmt_log!("It will automatically connect to the InfluxDB Outlet at {} as soon as it is available",
color_primary(&cmd.to)
)
} else if inlet_status.status == ConnectionStatus::Up {
created_message
+ &fmt_log!(
"sending traffic to the TCP Outlet at {}",
color_primary(&self.tcp_inlet.to)
"sending traffic to the InfluxDB Outlet at {}",
color_primary(&cmd.to)
)
} else {
fmt_warn!(
"A InfluxDB Inlet was created in the Node {} bound to {} but failed to connect to the TCP Outlet at {}\n",
"A InfluxDB Inlet was created in the Node {} bound to {} but failed to connect to the InfluxDB Outlet at {}\n",
color_primary(&node_name),
color_primary(self.tcp_inlet.from.to_string()),
color_primary(&self.tcp_inlet.to)
color_primary(cmd.from.to_string()),
color_primary(&cmd.to)
) + &fmt_info!("It will retry to connect automatically")
};

Expand All @@ -147,9 +260,52 @@ impl Command for InfluxDBCreateCommand {
}
}

impl InfluxDBCreateCommand {
impl CreateCommand {
async fn parse_args(mut self, opts: &CommandGlobalOpts) -> miette::Result<Self> {
self.tcp_inlet = self.tcp_inlet.parse_args(opts).await?;
if let Some(alias) = self.alias.as_ref() {
print_warning_for_deprecated_flag_replaced(
opts,
"alias",
"the <NAME> positional argument",
)?;
if self.name.is_none() {
self.name = Some(alias.clone());
} else {
return Err(miette!(
"--alias is deprecated and can't be used together with the <NAME> positional argument",
));
}
} else {
self.name = self.name.or_else(|| Some(random_name()));
}

let from = resolve_peer(self.from.to_string())
.await
.into_diagnostic()?;
port_is_free_guard(&from)?;

self.to = crate::tcp::inlet::create::CreateCommand::parse_arg_to(
&opts.state,
self.to,
self.via.as_ref(),
)
.await?;
if self.to().matches(0, &[proto::Project::CODE.into()]) && self.authorized.is_some() {
return Err(miette!(
"--authorized can not be used with project addresses"
))?;
}

self.tls_certificate_provider = if let Some(tls_certificate_provider) =
&self.tls_certificate_provider
{
Some(tls_certificate_provider.clone())
} else if self.tls {
Some(MultiAddr::from_str("/project/default/service/tls_certificate_provider").unwrap())
} else {
None
};

if self
.lease_manager_route
.as_ref()
Expand All @@ -159,6 +315,22 @@ impl InfluxDBCreateCommand {
"lease-manager-route argument requires leased-token-strategy=per-client"
))?
};

Ok(self)
}

pub fn to(&self) -> MultiAddr {
MultiAddr::from_str(&self.to).unwrap()
}

pub async fn secure_channel_identifier(
&self,
state: &CliState,
) -> miette::Result<Option<Identifier>> {
if let Some(identity_name) = self.identity.as_ref() {
Ok(Some(state.get_identifier_by_name(identity_name).await?))
} else {
Ok(None)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use clap::{Args, Subcommand};

use create::InfluxDBCreateCommand;
use create::CreateCommand;

use crate::{docs, Command, CommandGlobalOpts};

Expand All @@ -22,7 +22,7 @@ pub struct InfluxDBInletCommand {

#[derive(Clone, Debug, Subcommand)]
pub enum InfluxDBInletSubCommand {
Create(InfluxDBCreateCommand),
Create(CreateCommand),
}

impl InfluxDBInletCommand {
Expand Down
Loading

0 comments on commit 21a6fbe

Please sign in to comment.