From f4dba3964fd28eb591f84ae55c7210348118fba4 Mon Sep 17 00:00:00 2001 From: lif <> Date: Tue, 26 Mar 2024 23:46:30 -0700 Subject: [PATCH 1/5] wip - vnc proxy - API endpoint --- oxide.json | 39 +++++++++ sdk-httpmock/src/generated_httpmock.rs | 71 +++++++++++++++++ sdk/src/generated_sdk.rs | 105 +++++++++++++++++++++++++ 3 files changed, 215 insertions(+) diff --git a/oxide.json b/oxide.json index 9c39829a..ea11a176 100644 --- a/oxide.json +++ b/oxide.json @@ -2676,6 +2676,45 @@ } } }, + "/v1/instances/{instance}/vnc": { + "get": { + "tags": [ + "instances" + ], + "summary": "Stream instance VNC framebuffer", + "operationId": "instance_vnc", + "parameters": [ + { + "in": "path", + "name": "instance", + "description": "Name or ID of the instance", + "required": true, + "schema": { + "$ref": "#/components/schemas/NameOrId" + } + }, + { + "in": "query", + "name": "project", + "description": "Name or ID of the project", + "schema": { + "$ref": "#/components/schemas/NameOrId" + } + } + ], + "responses": { + "default": { + "description": "", + "content": { + "*/*": { + "schema": {} + } + } + } + }, + "x-dropshot-websocket": {} + } + }, "/v1/ip-pools": { "get": { "tags": [ diff --git a/sdk-httpmock/src/generated_httpmock.rs b/sdk-httpmock/src/generated_httpmock.rs index b222997c..7d423e29 100644 --- a/sdk-httpmock/src/generated_httpmock.rs +++ b/sdk-httpmock/src/generated_httpmock.rs @@ -4306,6 +4306,62 @@ pub mod operations { } } + pub struct InstanceVncWhen(httpmock::When); + impl InstanceVncWhen { + pub fn new(inner: httpmock::When) -> Self { + Self( + inner + .method(httpmock::Method::GET) + .path_matches(regex::Regex::new("^/v1/instances/[^/]*/vnc$").unwrap()), + ) + } + + pub fn into_inner(self) -> httpmock::When { + self.0 + } + + pub fn instance(self, value: &types::NameOrId) -> Self { + let re = + regex::Regex::new(&format!("^/v1/instances/{}/vnc$", value.to_string())).unwrap(); + Self(self.0.path_matches(re)) + } + + pub fn project<'a, T>(self, value: T) -> Self + where + T: Into>, + { + if let Some(value) = value.into() { + Self(self.0.query_param("project", value.to_string())) + } else { + Self(self.0.matches(|req| { + req.query_params + .as_ref() + .and_then(|qs| qs.iter().find(|(key, _)| key == "project")) + .is_none() + })) + } + } + } + + pub struct InstanceVncThen(httpmock::Then); + impl InstanceVncThen { + pub fn new(inner: httpmock::Then) -> Self { + Self(inner) + } + + pub fn into_inner(self) -> httpmock::Then { + self.0 + } + + pub fn default_response(self, status: u16) -> Self { + Self(self.0.status(status)) + } + + pub fn switching_protocols(self) -> Self { + Self(self.0.status(101u16)) + } + } + pub struct ProjectIpPoolListWhen(httpmock::When); impl ProjectIpPoolListWhen { pub fn new(inner: httpmock::When) -> Self { @@ -14397,6 +14453,9 @@ pub trait MockServerExt { fn instance_stop(&self, config_fn: F) -> httpmock::Mock where F: FnOnce(operations::InstanceStopWhen, operations::InstanceStopThen); + fn instance_vnc(&self, config_fn: F) -> httpmock::Mock + where + F: FnOnce(operations::InstanceVncWhen, operations::InstanceVncThen); fn project_ip_pool_list(&self, config_fn: F) -> httpmock::Mock where F: FnOnce(operations::ProjectIpPoolListWhen, operations::ProjectIpPoolListThen); @@ -15525,6 +15584,18 @@ impl MockServerExt for httpmock::MockServer { }) } + fn instance_vnc(&self, config_fn: F) -> httpmock::Mock + where + F: FnOnce(operations::InstanceVncWhen, operations::InstanceVncThen), + { + self.mock(|when, then| { + config_fn( + operations::InstanceVncWhen::new(when), + operations::InstanceVncThen::new(then), + ) + }) + } + fn project_ip_pool_list(&self, config_fn: F) -> httpmock::Mock where F: FnOnce(operations::ProjectIpPoolListWhen, operations::ProjectIpPoolListThen), diff --git a/sdk/src/generated_sdk.rs b/sdk/src/generated_sdk.rs index 1c733c4e..7e9e0c20 100644 --- a/sdk/src/generated_sdk.rs +++ b/sdk/src/generated_sdk.rs @@ -39011,6 +39011,21 @@ pub trait ClientInstancesExt { /// .await; /// ``` fn instance_stop(&self) -> builder::InstanceStop; + /// Stream instance VNC framebuffer + /// + /// Sends a `GET` request to `/v1/instances/{instance}/vnc` + /// + /// Arguments: + /// - `instance`: Name or ID of the instance + /// - `project`: Name or ID of the project + /// ```ignore + /// let response = client.instance_vnc() + /// .instance(instance) + /// .project(project) + /// .send() + /// .await; + /// ``` + fn instance_vnc(&self) -> builder::InstanceVnc; /// List network interfaces /// /// Sends a `GET` request to `/v1/network-interfaces` @@ -39184,6 +39199,10 @@ impl ClientInstancesExt for Client { builder::InstanceStop::new(self) } + fn instance_vnc(&self) -> builder::InstanceVnc { + builder::InstanceVnc::new(self) + } + fn instance_network_interface_list(&self) -> builder::InstanceNetworkInterfaceList { builder::InstanceNetworkInterfaceList::new(self) } @@ -47452,6 +47471,92 @@ pub mod builder { } } + /// Builder for [`ClientInstancesExt::instance_vnc`] + /// + /// [`ClientInstancesExt::instance_vnc`]: super::ClientInstancesExt::instance_vnc + #[derive(Debug, Clone)] + pub struct InstanceVnc<'a> { + client: &'a super::Client, + instance: Result, + project: Result, String>, + } + + impl<'a> InstanceVnc<'a> { + pub fn new(client: &'a super::Client) -> Self { + Self { + client: client, + instance: Err("instance was not initialized".to_string()), + project: Ok(None), + } + } + + pub fn instance(mut self, value: V) -> Self + where + V: std::convert::TryInto, + { + self.instance = value + .try_into() + .map_err(|_| "conversion to `NameOrId` for instance failed".to_string()); + self + } + + pub fn project(mut self, value: V) -> Self + where + V: std::convert::TryInto, + { + self.project = value + .try_into() + .map(Some) + .map_err(|_| "conversion to `NameOrId` for project failed".to_string()); + self + } + + /// Sends a `GET` request to `/v1/instances/{instance}/vnc` + pub async fn send( + self, + ) -> Result, Error> { + let Self { + client, + instance, + project, + } = self; + let instance = instance.map_err(Error::InvalidRequest)?; + let project = project.map_err(Error::InvalidRequest)?; + let url = format!( + "{}/v1/instances/{}/vnc", + client.baseurl, + encode_path(&instance.to_string()), + ); + let mut query = Vec::with_capacity(1usize); + if let Some(v) = &project { + query.push(("project", v.to_string())); + } + #[allow(unused_mut)] + let mut request = client + .client + .get(url) + .query(&query) + .header(reqwest::header::CONNECTION, "Upgrade") + .header(reqwest::header::UPGRADE, "websocket") + .header(reqwest::header::SEC_WEBSOCKET_VERSION, "13") + .header( + reqwest::header::SEC_WEBSOCKET_KEY, + base64::Engine::encode( + &base64::engine::general_purpose::STANDARD, + rand::random::<[u8; 16]>(), + ), + ) + .build()?; + let result = client.client.execute(request).await; + let response = result?; + match response.status().as_u16() { + 101u16 => ResponseValue::upgrade(response).await, + 200..=299 => ResponseValue::upgrade(response).await, + _ => Err(Error::UnexpectedResponse(response)), + } + } + } + /// Builder for [`ClientProjectsExt::project_ip_pool_list`] /// /// [`ClientProjectsExt::project_ip_pool_list`]: super::ClientProjectsExt::project_ip_pool_list From 3af03e5cd215aed82c0039f0810966f3e8563a7f Mon Sep 17 00:00:00 2001 From: lif <> Date: Tue, 26 Mar 2024 23:46:49 -0700 Subject: [PATCH 2/5] wip - vnc proxy - command --- Cargo.lock | 1 + Cargo.toml | 1 + cli/Cargo.toml | 1 + cli/src/cli_builder.rs | 1 + cli/src/cmd_instance.rs | 151 ++++++++++++++++++++++++++++++++++++++- cli/src/generated_cli.rs | 53 ++++++++++++++ cli/src/main.rs | 1 + 7 files changed, 208 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 5e97bbe6..29b91ea6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2070,6 +2070,7 @@ dependencies = [ "test-common", "thouart", "tokio", + "tokio-tungstenite", "url", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index 3267ee43..4ba9d3e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ tempfile = "3.10.1" test-common = { path = "test-common" } thouart = { git = "https://github.com/oxidecomputer/thouart.git" } tokio = { version = "1.36.0", features = ["full"] } +tokio-tungstenite = "0.20.1" toml = "0.8.12" toml_edit = "0.22.9" url = "2.5.0" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index b656df8d..a7aa12ed 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -41,6 +41,7 @@ serde_json = { workspace = true } tabwriter = { workspace = true } thouart = { workspace = true } tokio = { workspace = true } +tokio-tungstenite = { workspace = true } url = { workspace = true } uuid = { workspace = true } diff --git a/cli/src/cli_builder.rs b/cli/src/cli_builder.rs index 9da842f6..09012c77 100644 --- a/cli/src/cli_builder.rs +++ b/cli/src/cli_builder.rs @@ -274,6 +274,7 @@ fn xxx<'a>(command: CliCommand) -> Option<&'a str> { CliCommand::InstanceReboot => Some("instance reboot"), CliCommand::InstanceSerialConsole => None, // Special-cased CliCommand::InstanceSerialConsoleStream => None, // Ditto + CliCommand::InstanceVnc => None, // Ditto CliCommand::InstanceStart => Some("instance start"), CliCommand::InstanceStop => Some("instance stop"), CliCommand::InstanceExternalIpList => Some("instance external-ip list"), diff --git a/cli/src/cmd_instance.rs b/cli/src/cmd_instance.rs index 255b6d26..755b0deb 100644 --- a/cli/src/cmd_instance.rs +++ b/cli/src/cmd_instance.rs @@ -13,9 +13,14 @@ use oxide::types::{ NameOrId, }; +use futures::{SinkExt, StreamExt}; use oxide::ClientImagesExt; use oxide::ClientInstancesExt; use std::path::PathBuf; +use tokio::io::Interest; +use tokio_tungstenite::tungstenite::protocol::{frame::coding::CloseCode, CloseFrame, Role}; +use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::WebSocketStream; /// Connect to or retrieve data from the instance's serial console. #[derive(Parser, Debug, Clone)] @@ -165,7 +170,6 @@ pub struct CmdInstanceSerialHistory { #[async_trait] impl RunnableCmd for CmdInstanceSerialHistory { - // cli process becomes an interactive remote shell. async fn run(&self, ctx: &oxide::context::Context) -> Result<()> { let mut req = ctx .client()? @@ -198,6 +202,151 @@ impl RunnableCmd for CmdInstanceSerialHistory { } } +/// Connect to the instance's framebuffer and input with a local VNC client. +#[derive(Parser, Debug, Clone)] +#[command(verbatim_doc_comment)] +#[command(name = "serial")] +pub struct CmdInstanceVnc { + /// Name or ID of the instance + #[clap(long, short)] + instance: NameOrId, + + /// Name or ID of the project + #[clap(long, short)] + project: Option, + // TODO: vncviewer executable, or flag that says not to +} + +#[async_trait] +impl RunnableCmd for CmdInstanceVnc { + async fn run(&self, ctx: &oxide::context::Context) -> Result<()> { + let mut req = ctx.client()?.instance_vnc().instance(self.instance.clone()); + + if let Some(value) = &self.project { + req = req.project(value.clone()); + } else if let NameOrId::Name(_) = &self.instance { + // on the server end, the connection is upgraded by the server + // before the worker thread attempts to look up the instance. + anyhow::bail!("Must provide --project when specifying instance by name rather than ID"); + } + + // TODO: custom listen address + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + // yes, two ':' between IP and port. otherwise VNC adds 5900 to it! + let vncviewer_arg = format!("{ip}::{port}", ip = addr.ip(), port = addr.port()); + + // TODO: custom args etc. + let mut cmd = std::process::Command::new("vncviewer"); + cmd.arg(&vncviewer_arg); + let child_res = cmd.spawn(); + if child_res.is_err() { + eprintln!( + "Please connect a VNC client to {ip} on TCP port {port}.\nFor example: vncviewer {vncviewer_arg}", + ip = addr.ip(), + port = addr.port(), + vncviewer_arg = vncviewer_arg, + ); + } + + // TODO: clearer error case communication + let Ok((tcp_stream, _addr)) = listener.accept().await else { + anyhow::bail!("Failed to accept connection from local VNC client"); + }; + + // okay, we have a local client, now actually start requesting I/O through nexus + let upgraded = req.send().await.map_err(|e| e.into_untyped())?.into_inner(); + + let mut ws_stream = WebSocketStream::from_raw_socket(upgraded, Role::Client, None).await; + + let mut stored_out_buf: Option> = None; + loop { + let tcp_ready = if stored_out_buf.is_some() { + tcp_stream.ready(Interest::READABLE | Interest::WRITABLE) + } else { + tcp_stream.ready(Interest::READABLE) + }; + tokio::select! { + ready_res = tcp_ready => { + let ready = ready_res?; + if ready.is_readable() { + // TODO: find a justifiable size + let mut buf = [0u8; 65536]; + match tcp_stream.try_read(&mut buf) { + Ok(num_bytes) => { + ws_stream.send(Message::Binary(Vec::from(&buf[..num_bytes]))).await?; + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + continue; // false positive, ignore + } + Err(e) => { + ws_stream.send(Message::Close(None)).await?; + return Err(e.into()); + } + } + } + if ready.is_writable() { + let buf = stored_out_buf + .take() + .expect("tcp_ready should never be writable if we have nothing to write"); + match tcp_stream.try_write(&buf) { + Ok(num_bytes) => { + stored_out_buf = Some(Vec::from(&buf[num_bytes..])); + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + continue; // false positive, ignore + } + Err(e) => { + ws_stream.send(Message::Close(None)).await?; + return Err(e.into()); + } + } + } + } + out_buf = ws_stream.next() => { + match out_buf { + Some(Ok(Message::Binary(data))) => { + if let Some(existing) = &mut stored_out_buf { + existing.extend_from_slice(&data); + } else { + stored_out_buf = Some(data); + } + } + Some(Ok(Message::Close(Some(CloseFrame {code, reason})))) => { + match code { + CloseCode::Abnormal + | CloseCode::Error + | CloseCode::Extension + | CloseCode::Invalid + | CloseCode::Policy + | CloseCode::Protocol + | CloseCode::Size + | CloseCode::Unsupported => { + anyhow::bail!("Server disconnected: {}", reason.to_string()); + } + _ => break, + } + } + Some(Ok(Message::Close(None))) => { + eprintln!("Connection closed."); + break; + } + None => { + eprintln!("Connection lost."); + break; + } + _ => continue, + } + } + } + } + // let _: the connection may have already been dropped at this point. + let _ = ws_stream.send(Message::Close(None)).await; + + Ok(()) + } +} + /// Launch an instance from a disk image. #[derive(Parser, Debug, Clone)] #[command(verbatim_doc_comment)] diff --git a/cli/src/generated_cli.rs b/cli/src/generated_cli.rs index f2579fad..e4eb97cf 100644 --- a/cli/src/generated_cli.rs +++ b/cli/src/generated_cli.rs @@ -66,6 +66,7 @@ impl Cli { CliCommand::InstanceSshPublicKeyList => Self::cli_instance_ssh_public_key_list(), CliCommand::InstanceStart => Self::cli_instance_start(), CliCommand::InstanceStop => Self::cli_instance_stop(), + CliCommand::InstanceVnc => Self::cli_instance_vnc(), CliCommand::ProjectIpPoolList => Self::cli_project_ip_pool_list(), CliCommand::ProjectIpPoolView => Self::cli_project_ip_pool_view(), CliCommand::LoginLocal => Self::cli_login_local(), @@ -1922,6 +1923,25 @@ impl Cli { .about("Stop instance") } + pub fn cli_instance_vnc() -> clap::Command { + clap::Command::new("") + .arg( + clap::Arg::new("instance") + .long("instance") + .value_parser(clap::value_parser!(types::NameOrId)) + .required(true) + .help("Name or ID of the instance"), + ) + .arg( + clap::Arg::new("project") + .long("project") + .value_parser(clap::value_parser!(types::NameOrId)) + .required(false) + .help("Name or ID of the project"), + ) + .about("Stream instance VNC framebuffer") + } + pub fn cli_project_ip_pool_list() -> clap::Command { clap::Command::new("") .arg( @@ -5467,6 +5487,7 @@ impl Cli { } CliCommand::InstanceStart => self.execute_instance_start(matches).await, CliCommand::InstanceStop => self.execute_instance_stop(matches).await, + CliCommand::InstanceVnc => self.execute_instance_vnc(matches).await, CliCommand::ProjectIpPoolList => self.execute_project_ip_pool_list(matches).await, CliCommand::ProjectIpPoolView => self.execute_project_ip_pool_view(matches).await, CliCommand::LoginLocal => self.execute_login_local(matches).await, @@ -7414,6 +7435,28 @@ impl Cli { } } + pub async fn execute_instance_vnc(&self, matches: &clap::ArgMatches) -> anyhow::Result<()> { + let mut request = self.client.instance_vnc(); + if let Some(value) = matches.get_one::("instance") { + request = request.instance(value.clone()); + } + + if let Some(value) = matches.get_one::("project") { + request = request.project(value.clone()); + } + + self.config.execute_instance_vnc(matches, &mut request)?; + let result = request.send().await; + match result { + Ok(r) => { + todo!() + } + Err(r) => { + todo!() + } + } + } + pub async fn execute_project_ip_pool_list( &self, matches: &clap::ArgMatches, @@ -11853,6 +11896,14 @@ pub trait CliConfig { Ok(()) } + fn execute_instance_vnc( + &self, + matches: &clap::ArgMatches, + request: &mut builder::InstanceVnc, + ) -> anyhow::Result<()> { + Ok(()) + } + fn execute_project_ip_pool_list( &self, matches: &clap::ArgMatches, @@ -12925,6 +12976,7 @@ pub enum CliCommand { InstanceSshPublicKeyList, InstanceStart, InstanceStop, + InstanceVnc, ProjectIpPoolList, ProjectIpPoolView, LoginLocal, @@ -13110,6 +13162,7 @@ impl CliCommand { CliCommand::InstanceSshPublicKeyList, CliCommand::InstanceStart, CliCommand::InstanceStop, + CliCommand::InstanceVnc, CliCommand::ProjectIpPoolList, CliCommand::ProjectIpPoolView, CliCommand::LoginLocal, diff --git a/cli/src/main.rs b/cli/src/main.rs index dc750a41..b985c383 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -47,6 +47,7 @@ pub fn make_cli() -> NewCli<'static> { .add_custom::("version") .add_custom::("disk import") .add_custom::("instance serial") + .add_custom::("instance vnc") .add_custom::("instance from-image") .add_custom::("completion") } From 98e18142f8f746ccf76af3572faa32b8ac8f15c0 Mon Sep 17 00:00:00 2001 From: lif <> Date: Wed, 27 Mar 2024 03:47:06 -0700 Subject: [PATCH 3/5] wip - vnc proxy - oops --- cli/src/cmd_instance.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cli/src/cmd_instance.rs b/cli/src/cmd_instance.rs index 755b0deb..c4f00afa 100644 --- a/cli/src/cmd_instance.rs +++ b/cli/src/cmd_instance.rs @@ -291,7 +291,10 @@ impl RunnableCmd for CmdInstanceVnc { .expect("tcp_ready should never be writable if we have nothing to write"); match tcp_stream.try_write(&buf) { Ok(num_bytes) => { - stored_out_buf = Some(Vec::from(&buf[num_bytes..])); + if num_bytes < buf.len() { + // TODO: less allocating + stored_out_buf = Some(Vec::from(&buf[num_bytes..])); + } } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { continue; // false positive, ignore From bae87085f8992ca20311c38e34652c27d090d493 Mon Sep 17 00:00:00 2001 From: lif <> Date: Wed, 27 Mar 2024 11:45:03 -0700 Subject: [PATCH 4/5] wip - vnc proxy - less awkward --- cli/src/cmd_instance.rs | 97 ++++++++++++++++++++--------------------- 1 file changed, 48 insertions(+), 49 deletions(-) diff --git a/cli/src/cmd_instance.rs b/cli/src/cmd_instance.rs index c4f00afa..808224ab 100644 --- a/cli/src/cmd_instance.rs +++ b/cli/src/cmd_instance.rs @@ -17,7 +17,7 @@ use futures::{SinkExt, StreamExt}; use oxide::ClientImagesExt; use oxide::ClientInstancesExt; use std::path::PathBuf; -use tokio::io::Interest; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio_tungstenite::tungstenite::protocol::{frame::coding::CloseCode, CloseFrame, Role}; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::WebSocketStream; @@ -257,62 +257,57 @@ impl RunnableCmd for CmdInstanceVnc { // okay, we have a local client, now actually start requesting I/O through nexus let upgraded = req.send().await.map_err(|e| e.into_untyped())?.into_inner(); - let mut ws_stream = WebSocketStream::from_raw_socket(upgraded, Role::Client, None).await; + let ws = WebSocketStream::from_raw_socket(upgraded, Role::Client, None).await; - let mut stored_out_buf: Option> = None; - loop { - let tcp_ready = if stored_out_buf.is_some() { - tcp_stream.ready(Interest::READABLE | Interest::WRITABLE) - } else { - tcp_stream.ready(Interest::READABLE) - }; - tokio::select! { - ready_res = tcp_ready => { - let ready = ready_res?; - if ready.is_readable() { - // TODO: find a justifiable size - let mut buf = [0u8; 65536]; - match tcp_stream.try_read(&mut buf) { + let (mut ws_sink, mut ws_stream) = ws.split(); + let (mut tcp_reader, mut tcp_writer) = tcp_stream.into_split(); + let (closed_tx, mut closed_rx) = tokio::sync::oneshot::channel::<()>(); + + let mut jh = tokio::spawn(async move { + // medium-sized websocket payload + let mut tcp_read_buf = vec![0u8; 65535]; + loop { + tokio::select! { + _ = &mut closed_rx => break, + num_bytes_res = tcp_reader.read(&mut tcp_read_buf) => { + match num_bytes_res { Ok(num_bytes) => { - ws_stream.send(Message::Binary(Vec::from(&buf[..num_bytes]))).await?; - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - continue; // false positive, ignore + ws_sink + .send(Message::Binary(Vec::from(&tcp_read_buf[..num_bytes]))) + .await?; } Err(e) => { - ws_stream.send(Message::Close(None)).await?; - return Err(e.into()); - } - } - } - if ready.is_writable() { - let buf = stored_out_buf - .take() - .expect("tcp_ready should never be writable if we have nothing to write"); - match tcp_stream.try_write(&buf) { - Ok(num_bytes) => { - if num_bytes < buf.len() { - // TODO: less allocating - stored_out_buf = Some(Vec::from(&buf[num_bytes..])); - } - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - continue; // false positive, ignore - } - Err(e) => { - ws_stream.send(Message::Close(None)).await?; - return Err(e.into()); + ws_sink.send(Message::Close(None)).await?; + anyhow::bail!("Local client disconnected: {}", e); } } } } - out_buf = ws_stream.next() => { - match out_buf { + } + Ok(ws_sink) + }); + + let mut close_frame = None; + loop { + tokio::select! { + _ = &mut jh => break, + msg = ws_stream.next() => { + match msg { Some(Ok(Message::Binary(data))) => { - if let Some(existing) = &mut stored_out_buf { - existing.extend_from_slice(&data); - } else { - stored_out_buf = Some(data); + let mut start = 0; + while start < data.len() { + match tcp_writer.write(&data[start..]).await { + Ok(num_bytes) => { + start += num_bytes; + } + Err(e) => { + close_frame = Some(CloseFrame { + code: CloseCode::Error, + reason: e.to_string().into(), + }); + break; + } + } } } Some(Ok(Message::Close(Some(CloseFrame {code, reason})))) => { @@ -343,8 +338,12 @@ impl RunnableCmd for CmdInstanceVnc { } } } + // let _: the connection may have already been dropped at this point. - let _ = ws_stream.send(Message::Close(None)).await; + let _ = closed_tx.send(()).is_ok(); + if let Ok(Ok(mut ws_sink)) = jh.await { + let _ = ws_sink.send(Message::Close(close_frame)).await.is_ok(); + } Ok(()) } From 27cc092d09af2f9a18f5162f07dc2071ce79762a Mon Sep 17 00:00:00 2001 From: lif <> Date: Thu, 9 May 2024 22:39:36 -0700 Subject: [PATCH 5/5] wip - vnc proxy - check that instance is running first --- cli/src/cmd_instance.rs | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/cli/src/cmd_instance.rs b/cli/src/cmd_instance.rs index 808224ab..b5b1f80e 100644 --- a/cli/src/cmd_instance.rs +++ b/cli/src/cmd_instance.rs @@ -8,6 +8,7 @@ use crate::RunnableCmd; use anyhow::Result; use async_trait::async_trait; use clap::Parser; +use oxide::types::InstanceState; use oxide::types::{ ByteCount, DiskSource, ExternalIpCreate, InstanceCpuCount, InstanceDiskAttachment, Name, NameOrId, @@ -220,16 +221,29 @@ pub struct CmdInstanceVnc { #[async_trait] impl RunnableCmd for CmdInstanceVnc { async fn run(&self, ctx: &oxide::context::Context) -> Result<()> { + let mut prereq = ctx + .client()? + .instance_view() + .instance(self.instance.clone()); let mut req = ctx.client()?.instance_vnc().instance(self.instance.clone()); if let Some(value) = &self.project { req = req.project(value.clone()); + prereq = prereq.project(value.clone()); } else if let NameOrId::Name(_) = &self.instance { // on the server end, the connection is upgraded by the server // before the worker thread attempts to look up the instance. anyhow::bail!("Must provide --project when specifying instance by name rather than ID"); } + let view = prereq.send().await?; + if view.run_state != InstanceState::Running { + anyhow::bail!( + "Instance must be running to connect to VNC, but it is currently {:?}", + view.run_state + ); + } + // TODO: custom listen address let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?; let addr = listener.local_addr()?; @@ -288,9 +302,18 @@ impl RunnableCmd for CmdInstanceVnc { }); let mut close_frame = None; + let mut task_joined = false; loop { tokio::select! { - _ = &mut jh => break, + res = &mut jh => { + if let Ok(Ok(mut ws_sink)) = res { + // take() avoids borrow checker complaint about code that sends close + // if we don't join the handle in the select (below loop) + let _ = ws_sink.send(Message::Close(close_frame.take())).await.is_ok(); + } + task_joined = true; + break; + } msg = ws_stream.next() => { match msg { Some(Ok(Message::Binary(data))) => { @@ -339,10 +362,12 @@ impl RunnableCmd for CmdInstanceVnc { } } - // let _: the connection may have already been dropped at this point. - let _ = closed_tx.send(()).is_ok(); - if let Ok(Ok(mut ws_sink)) = jh.await { - let _ = ws_sink.send(Message::Close(close_frame)).await.is_ok(); + if !task_joined { + // let _: the connection may have already been dropped at this point. + let _ = closed_tx.send(()).is_ok(); + if let Ok(Ok(mut ws_sink)) = jh.await { + let _ = ws_sink.send(Message::Close(close_frame)).await.is_ok(); + } } Ok(())