diff --git a/Cargo.lock b/Cargo.lock index 2166d0197b..099328f456 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1587,11 +1587,11 @@ dependencies = [ [[package]] name = "clap_complete" -version = "4.5.13" +version = "4.5.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa3c596da3cf0983427b0df0dba359df9182c13bd5b519b585a482b0c351f4e8" +checksum = "9c677cd0126f3026d8b093fa29eae5d812fde5c05bc66dbb29d0374eea95113a" dependencies = [ - "clap 4.5.13", + "clap 4.5.15", ] [[package]] @@ -3378,6 +3378,7 @@ dependencies = [ "inquire", "iso8601", "itertools 0.13.0", + "lenient_bool", "libtest-mimic", "log", "native-tls", @@ -5098,6 +5099,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" +[[package]] +name = "lenient_bool" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57eeaed462e96d277051c219bf5e2ed22aedf7705d8945d2decb8b2db8fb954d" + [[package]] name = "lexical-core" version = "0.7.6" @@ -7209,9 +7216,9 @@ dependencies = [ [[package]] name = "ringbuf" -version = "0.3.3" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79abed428d1fd2a128201cec72c5f6938e2da607c6f3745f769fabea399d950a" +checksum = "5c65e4c865bc3d2e3294493dff0acf7e6c259d066e34e22059fa9c39645c3636" dependencies = [ "crossbeam-utils", ] diff --git a/Cargo.toml b/Cargo.toml index 7818cc7e11..ea446d1664 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ exclude = [ "test-components/http-client-2", "test-components/interruption", "test-components/key-value-service", + "test-components/logging", "test-components/networking", "test-components/option-service", "test-components/promise", diff --git a/golem-api-grpc/build.rs b/golem-api-grpc/build.rs index 3baaca2131..754b71bbb9 100644 --- a/golem-api-grpc/build.rs +++ b/golem-api-grpc/build.rs @@ -12,30 +12,6 @@ fn main() -> Result<(), Box> { .file_descriptor_set_path(out_dir.join("services.bin")) .extern_path(".wasm.rpc", "::golem_wasm_rpc::protobuf") .extern_path(".wasm.ast", "::golem_wasm_ast::analysis::protobuf") - .type_attribute( - "golem.worker.LogEvent", - "#[derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize)]", - ) - .type_attribute( - "golem.worker.LogEvent.event", - "#[derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize)]", - ) - .type_attribute( - "golem.worker.StdOutLog", - "#[derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize)]", - ) - .type_attribute( - "golem.worker.StdErrLog", - "#[derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize)]", - ) - .type_attribute( - "golem.worker.Level", - "#[derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize)]", - ) - .type_attribute( - "golem.worker.Log", - "#[derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize)]", - ) .include_file("mod.rs") .compile( &[ diff --git a/golem-api-grpc/proto/golem/worker/log_event.proto b/golem-api-grpc/proto/golem/worker/log_event.proto index ef915b2061..7f6d734c33 100644 --- a/golem-api-grpc/proto/golem/worker/log_event.proto +++ b/golem-api-grpc/proto/golem/worker/log_event.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package golem.worker; +import "google/protobuf/timestamp.proto"; + message LogEvent { oneof event { StdOutLog stdout = 1; @@ -12,10 +14,12 @@ message LogEvent { message StdOutLog { string message = 1; + google.protobuf.Timestamp timestamp = 2; } message StdErrLog { string message = 1; + google.protobuf.Timestamp timestamp = 2; } enum Level { @@ -31,4 +35,5 @@ message Log { Level level = 1; string context = 2; string message = 3; + google.protobuf.Timestamp timestamp = 4; } diff --git a/golem-cli/Cargo.toml b/golem-cli/Cargo.toml index 4191ee9bdc..b273321816 100644 --- a/golem-cli/Cargo.toml +++ b/golem-cli/Cargo.toml @@ -41,6 +41,7 @@ http = { workspace = true } hyper = { workspace = true } indoc = "2.0.4" itertools = { workspace = true } +lenient_bool = "0.1.1" log = { version = "0.4.22" } native-tls = "0.2.11" openapiv3 = { workspace = true } diff --git a/golem-cli/src/clients/worker.rs b/golem-cli/src/clients/worker.rs index cc1a649b8d..a88458564a 100644 --- a/golem-cli/src/clients/worker.rs +++ b/golem-cli/src/clients/worker.rs @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::command::worker::WorkerConnectOptions; use crate::model::{ - GolemError, IdempotencyKey, WorkerMetadata, WorkerName, WorkerUpdateMode, + Format, GolemError, IdempotencyKey, WorkerMetadata, WorkerName, WorkerUpdateMode, WorkersMetadataResponse, }; use async_trait::async_trait; @@ -67,7 +68,24 @@ pub trait WorkerClient { count: Option, precise: Option, ) -> Result; - async fn connect(&self, worker_urn: WorkerUrn) -> Result<(), GolemError>; + async fn connect( + &self, + worker_urn: WorkerUrn, + connect_options: WorkerConnectOptions, + format: Format, + ) -> Result<(), GolemError>; + + async fn connect_forever( + &self, + worker_urn: WorkerUrn, + connect_options: WorkerConnectOptions, + format: Format, + ) -> Result<(), GolemError> { + loop { + self.connect(worker_urn.clone(), connect_options.clone(), format) + .await?; + } + } async fn update( &self, diff --git a/golem-cli/src/command/worker.rs b/golem-cli/src/command/worker.rs index b3471cc1fa..5ae2d50e23 100644 --- a/golem-cli/src/command/worker.rs +++ b/golem-cli/src/command/worker.rs @@ -14,20 +14,23 @@ use crate::command::ComponentRefSplit; use clap::builder::ValueParser; -use clap::{ArgMatches, Error, FromArgMatches, Subcommand}; +use clap::{ArgMatches, Args, Error, FromArgMatches, Subcommand}; use golem_client::model::ScanCursor; use golem_common::model::WorkerId; use golem_common::uri::oss::uri::{ComponentUri, WorkerUri}; use golem_common::uri::oss::url::{ComponentUrl, WorkerUrl}; use golem_common::uri::oss::urn::{ComponentUrn, WorkerUrn}; +use std::sync::Arc; +use tokio::join; +use tokio::task::spawn; use crate::model::{ Format, GolemError, GolemResult, IdempotencyKey, JsonValueParser, WorkerName, WorkerUpdateMode, }; use crate::oss::model::OssContext; -use crate::parse_key_val; use crate::service::project::ProjectResolver; use crate::service::worker::WorkerService; +use crate::{parse_bool, parse_key_val}; #[derive(clap::Args, Debug, Clone)] pub struct OssWorkerNameOrUriArg { @@ -211,6 +214,42 @@ impl From<&OssWorkerUriArg> for OssWorkerNameOrUriArg { } } +#[derive(Args, Debug, Clone)] +pub struct WorkerConnectOptions { + /// Use colored log lines in text mode + #[arg(long, + action=clap::ArgAction::Set, + value_parser = parse_bool, + default_missing_value="true", + default_value_t=true, + num_args = 0..=1, + require_equals = false, + )] + pub colors: bool, + + /// Show a timestamp for each log line + #[arg(long, + action=clap::ArgAction::Set, + value_parser = parse_bool, + default_missing_value="true", + default_value_t=true, + num_args = 0..=1, + require_equals = false, + )] + pub show_timestamp: bool, + + /// Show the source or log level for each log line + #[arg(long, + action=clap::ArgAction::Set, + value_parser = parse_bool, + default_missing_value="true", + default_value_t=false, + num_args = 0..=1, + require_equals = false, + )] + pub show_level: bool, +} + #[derive(Subcommand, Debug)] #[command()] pub enum WorkerSubcommand { @@ -254,6 +293,13 @@ pub enum WorkerSubcommand { #[command(flatten)] parameters: InvokeParameterList, + + /// Connect to the worker during the invocation and show its logs + #[arg(long)] + connect: bool, + + #[command(flatten)] + connect_options: WorkerConnectOptions, }, /// Triggers a function invocation on a worker without waiting for its completion @@ -272,6 +318,13 @@ pub enum WorkerSubcommand { #[command(flatten)] parameters: InvokeParameterList, + + /// Connect to the worker and show its logs + #[arg(long)] + connect: bool, + + #[command(flatten)] + connect_options: WorkerConnectOptions, }, /// Connect to a worker and live stream its standard output, error and log channels @@ -279,6 +332,8 @@ pub enum WorkerSubcommand { Connect { #[command(flatten)] worker_ref: WorkerRef, + #[command(flatten)] + connect_options: WorkerConnectOptions, }, /// Interrupts a running worker @@ -373,11 +428,14 @@ impl WorkerRefSplit for OssWorkerUriArg { } impl WorkerSubcommand { - pub async fn handle( + pub async fn handle< + ProjectRef: Send + Sync + 'static, + ProjectContext: Clone + Send + Sync + 'static, + >( self, format: Format, - service: &(dyn WorkerService + Send + Sync), - projects: &(dyn ProjectResolver + Send + Sync), + service: Arc + Send + Sync>, + projects: Arc + Send + Sync>, ) -> Result where ComponentRef: ComponentRefSplit, @@ -402,44 +460,94 @@ impl WorkerSubcommand { let (worker_uri, project_ref) = worker_ref.split(); let project_id = projects.resolve_id_or_default_opt(project_ref).await?; - service - .invoke_and_await( - format, - worker_uri, - idempotency_key, - function, - parameters.parameters, - parameters.wave, - project_id, - ) - .await + if connect { + let worker_uri_clone = worker_uri.clone(); + let project_id_clone = project_id.clone(); + let service_clone = service.clone(); + let connect_handle = spawn(async move { + let _ = service_clone + .connect(worker_uri_clone, project_id_clone, connect_options, format) + .await; + }); + let result = service + .invoke_and_await( + format, + worker_uri, + idempotency_key, + function, + parameters.parameters, + parameters.wave, + project_id, + ) + .await; + + connect_handle.abort(); + result + } else { + service + .invoke_and_await( + format, + worker_uri, + idempotency_key, + function, + parameters.parameters, + parameters.wave, + project_id, + ) + .await + } } WorkerSubcommand::Invoke { worker_ref, idempotency_key, function, parameters, + connect, + connect_options, } => { let (worker_uri, project_ref) = worker_ref.split(); let project_id = projects.resolve_id_or_default_opt(project_ref).await?; - service - .invoke( - worker_uri, + + if connect { + let invoke_future = service.invoke( + worker_uri.clone(), idempotency_key, function, parameters.parameters, parameters.wave, - project_id, - ) - .await + project_id.clone(), + ); + let connect_future = + service.connect(worker_uri, project_id, connect_options, format); + + join!(invoke_future, connect_future).0 + } else { + service + .invoke( + worker_uri, + idempotency_key, + function, + parameters.parameters, + parameters.wave, + project_id, + ) + .await + } } - WorkerSubcommand::Connect { worker_ref } => { + WorkerSubcommand::Connect { + worker_ref, + connect_options, + } => { let (worker_uri, project_ref) = worker_ref.split(); let project_id = projects.resolve_id_or_default_opt(project_ref).await?; - service.connect(worker_uri, project_id).await + service + .connect(worker_uri, project_id, connect_options, format) + .await } WorkerSubcommand::Interrupt { worker_ref } => { let (worker_uri, project_ref) = worker_ref.split(); diff --git a/golem-cli/src/connect_output.rs b/golem-cli/src/connect_output.rs new file mode 100644 index 0000000000..e142afa468 --- /dev/null +++ b/golem-cli/src/connect_output.rs @@ -0,0 +1,212 @@ +// Copyright 2024 Golem Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::command::worker::WorkerConnectOptions; +use crate::model::Format; +use colored::Colorize; +use golem_common::model::{LogLevel, Timestamp}; +use std::fmt::Write; +use std::sync::Arc; +use tokio::sync::Mutex; + +#[derive(Clone)] +pub struct ConnectOutput { + state: Arc>, + options: WorkerConnectOptions, + format: Format, +} + +struct ConnectOutputState { + pub last_stdout_timestamp: Timestamp, + pub stdout: String, + pub last_stderr_timestamp: Timestamp, + pub stderr: String, +} + +impl ConnectOutput { + pub fn new(options: WorkerConnectOptions, format: Format) -> Self { + ConnectOutput { + state: Arc::new(Mutex::new(ConnectOutputState { + last_stdout_timestamp: Timestamp::now_utc(), + stdout: String::new(), + last_stderr_timestamp: Timestamp::now_utc(), + stderr: String::new(), + })), + options, + format, + } + } + + pub async fn emit_stdout(&self, timestamp: Timestamp, message: String) { + let mut state = self.state.lock().await; + state.last_stdout_timestamp = timestamp; + + let lines = message.lines().collect::>(); + for (idx, line) in lines.iter().enumerate() { + if idx == (lines.len() - 1) { + // last line, if message did not end with newline, just store it + if message.ends_with('\n') { + self.print_stdout(timestamp, &format!("{}{}", state.stdout, line)); + state.stdout = String::new(); + } else { + state.stdout = format!("{}{}", state.stdout, line); + } + } else if idx == 0 { + // first line, there are more + self.print_stdout(timestamp, &format!("{}{}", state.stdout, line)); + state.stdout = String::new(); + } else { + // middle line + self.print_stdout(timestamp, line); + } + } + } + + pub async fn emit_stderr(&self, timestamp: Timestamp, message: String) { + let mut state = self.state.lock().await; + state.last_stderr_timestamp = timestamp; + + let lines = message.lines().collect::>(); + for (idx, line) in lines.iter().enumerate() { + if idx == (lines.len() - 1) { + // last line, if message did not end with newline, just store it + if message.ends_with('\n') { + self.print_stderr(timestamp, &format!("{}{}", state.stderr, line)); + state.stderr = String::new(); + } else { + state.stderr = format!("{}{}", state.stderr, line); + } + } else if idx == 0 { + // first line, there are more + self.print_stderr(timestamp, &format!("{}{}", state.stderr, line)); + state.stderr = String::new(); + } else { + // middle line + self.print_stderr(timestamp, line); + } + } + } + + pub fn emit_log( + &self, + timestamp: Timestamp, + level: LogLevel, + context: String, + message: String, + ) { + let level_str = match level { + LogLevel::Trace => "TRACE", + LogLevel::Debug => "DEBUG", + LogLevel::Info => "INFO", + LogLevel::Warn => "WARN", + LogLevel::Error => "ERROR", + LogLevel::Critical => "CRITICAL", + }; + + match self.format { + Format::Json => self.json(level_str, &context, &message), + Format::Yaml => self.yaml(level_str, &context, &message), + Format::Text => { + let prefix = self.prefix(timestamp, level_str); + self.colored(level, &format!("{prefix}[{context}] {message}")); + } + } + } + + pub async fn flush(&self) { + let mut state = self.state.lock().await; + if !state.stdout.is_empty() { + self.print_stdout(state.last_stdout_timestamp, &state.stdout); + state.stdout = String::new(); + } + if !state.stderr.is_empty() { + self.print_stderr(state.last_stdout_timestamp, &state.stderr); + state.stderr = String::new(); + } + } + + fn print_stdout(&self, timestamp: Timestamp, message: &str) { + match self.format { + Format::Json => self.json("STDOUT", "", message), + Format::Yaml => self.yaml("STDOUT", "", message), + Format::Text => { + let prefix = self.prefix(timestamp, "STDOUT"); + self.colored(LogLevel::Info, &format!("{prefix}{message}")); + } + } + } + + fn print_stderr(&self, timestamp: Timestamp, message: &str) { + match self.format { + Format::Json => self.json("STDERR", "", message), + Format::Yaml => self.yaml("STDERR", "", message), + Format::Text => { + let prefix = self.prefix(timestamp, "STDERR"); + self.colored(LogLevel::Error, &format!("{prefix}{message}")); + } + } + } + + fn json(&self, level_or_source: &str, context: &str, message: &str) { + let json = self.json_value(level_or_source, context, message); + println!("{}", json); + } + + fn yaml(&self, level_or_source: &str, context: &str, message: &str) { + let json = self.json_value(level_or_source, context, message); + println!("{}", serde_yaml::to_string(&json).unwrap()); + } + + fn json_value(&self, level_or_source: &str, context: &str, message: &str) -> serde_json::Value { + serde_json::json!({ + "timestamp": Timestamp::now_utc(), + "level": level_or_source, + "context": context, + "message": message, + }) + } + + fn colored(&self, level: LogLevel, s: &str) { + if self.options.colors { + let colored = match level { + LogLevel::Trace => s.blue(), + LogLevel::Debug => s.green(), + LogLevel::Info => s.white(), + LogLevel::Warn => s.yellow(), + LogLevel::Error => s.red(), + LogLevel::Critical => s.red().bold(), + }; + println!("{}", colored); + } else { + println!("{}", s); + } + } + + fn prefix(&self, timestamp: Timestamp, level_or_source: &str) -> String { + let mut result = String::new(); + if self.options.show_timestamp { + let _ = write!(&mut result, "[{timestamp}] "); + } + if self.options.show_level { + let _ = result.write_char('['); + let _ = result.write_str(level_or_source); + for _ in level_or_source.len()..8 { + let _ = result.write_char(' '); + } + let _ = result.write_char(']'); + let _ = result.write_char(' '); + } + result + } +} diff --git a/golem-cli/src/factory.rs b/golem-cli/src/factory.rs index c68f436c43..ff70e0d669 100644 --- a/golem-cli/src/factory.rs +++ b/golem-cli/src/factory.rs @@ -27,6 +27,7 @@ use crate::service::worker::{ ComponentServiceBuilder, WorkerClientBuilder, WorkerService, WorkerServiceLive, }; use std::fmt::Display; +use std::sync::Arc; pub trait ServiceFactory { type SecurityContext: Clone + Send + Sync + 'static; @@ -41,7 +42,7 @@ pub trait ServiceFactory { &self, auth: &Self::SecurityContext, ) -> Result< - Box + Send + Sync>, + Arc + Send + Sync>, GolemError, >; fn component_client( @@ -70,13 +71,13 @@ pub trait ServiceFactory { &self, auth: &Self::SecurityContext, ) -> Result< - Box + Send + Sync>, + Arc + Send + Sync>, GolemError, > where Self: Send + Sync + Sized + 'static, { - Ok(Box::new(WorkerServiceLive { + Ok(Arc::new(WorkerServiceLive { client: self.worker_client(auth)?, components: self.component_service(auth)?, client_builder: Box::new(self.with_auth(auth)), diff --git a/golem-cli/src/lib.rs b/golem-cli/src/lib.rs index 6d605dae9a..7af5d9e889 100644 --- a/golem-cli/src/lib.rs +++ b/golem-cli/src/lib.rs @@ -12,10 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use lenient_bool::LenientBool; + pub mod clients; pub mod cloud; pub mod command; pub mod config; +pub mod connect_output; pub mod examples; pub mod factory; pub mod init; @@ -32,3 +35,10 @@ pub fn parse_key_val( .ok_or_else(|| format!("invalid KEY=value: no `=` found in `{s}`"))?; Ok((s[..pos].parse()?, s[pos + 1..].parse()?)) } + +pub fn parse_bool(s: &str) -> Result> { + match s.parse::() { + Ok(b) => Ok(b.into()), + Err(_) => Err(format!("invalid boolean: `{s}`"))?, + } +} diff --git a/golem-cli/src/oss/clients/worker.rs b/golem-cli/src/oss/clients/worker.rs index 3ed03c81c9..cd2b7c368c 100644 --- a/golem-cli/src/oss/clients/worker.rs +++ b/golem-cli/src/oss/clients/worker.rs @@ -15,8 +15,10 @@ use std::time::Duration; use crate::clients::worker::WorkerClient; +use crate::command::worker::WorkerConnectOptions; +use crate::connect_output::ConnectOutput; use crate::model::{ - GolemError, IdempotencyKey, WorkerMetadata, WorkerName, WorkerUpdateMode, + Format, GolemError, IdempotencyKey, WorkerMetadata, WorkerName, WorkerUpdateMode, WorkersMetadataResponse, }; use async_trait::async_trait; @@ -27,14 +29,14 @@ use golem_client::model::{ WorkerFilter, WorkerId, WorkersMetadataRequest, }; use golem_client::{Context, Error}; +use golem_common::model::WorkerEvent; use golem_common::uri::oss::urn::{ComponentUrn, WorkerUrn}; use native_tls::TlsConnector; -use serde::Deserialize; use tokio::{task, time}; use tokio_tungstenite::tungstenite::client::IntoClientRequest; use tokio_tungstenite::tungstenite::protocol::Message; use tokio_tungstenite::{connect_async_tls_with_config, Connector}; -use tracing::{debug, info}; +use tracing::{debug, error, info, trace}; #[derive(Clone)] pub struct WorkerClientLive { @@ -230,7 +232,12 @@ impl WorkerClient for WorkerCl .into()) } - async fn connect(&self, worker_urn: WorkerUrn) -> Result<(), GolemError> { + async fn connect( + &self, + worker_urn: WorkerUrn, + connect_options: WorkerConnectOptions, + format: Format, + ) -> Result<(), GolemError> { let mut url = self.context.base_url.clone(); let ws_schema = if url.scheme() == "http" { "ws" } else { "wss" }; @@ -271,6 +278,8 @@ impl WorkerClient for WorkerCl None }; + info!("Connecting to {worker_urn}"); + let (ws_stream, _) = connect_async_tls_with_config(request, None, false, connector) .await .map_err(|e| match e { @@ -288,85 +297,115 @@ impl WorkerClient for WorkerCl let (mut write, read) = ws_stream.split(); let pings = task::spawn(async move { - let mut interval = time::interval(Duration::from_secs(5)); // TODO configure - + let mut interval = time::interval(Duration::from_secs(1)); // TODO configure let mut cnt: i32 = 1; loop { interval.tick().await; - write + let ping_result = write .send(Message::Ping(cnt.to_ne_bytes().to_vec())) .await - .unwrap(); // TODO: handle errors: map_err(|e| GolemError(format!("Ping failure: {e}")))?; + .map_err(|err| GolemError(format!("Worker connection ping failure: {err}"))); + + if let Err(err) = ping_result { + error!("{}", err); + break err; + } cnt += 1; } }); - let read_res = read.for_each(|message_or_error| async { - match message_or_error { - Err(error) => { - print!("Error reading message: {}", error); - } - Ok(message) => { - let instance_connect_msg = match message { - Message::Text(str) => { - let parsed: serde_json::Result = - serde_json::from_str(&str); - Some(parsed.unwrap()) // TODO: error handling - } - Message::Binary(data) => { - let parsed: serde_json::Result = - serde_json::from_slice(&data); - Some(parsed.unwrap()) // TODO: error handling - } - Message::Ping(_) => { - debug!("Ignore ping"); - None - } - Message::Pong(_) => { - debug!("Ignore pong"); - None - } - Message::Close(details) => { - match details { - Some(closed_frame) => { - print!("Connection Closed: {}", closed_frame); + let output = ConnectOutput::new(connect_options, format); + + let read_res = read.for_each(move |message_or_error| { + let output = output.clone(); + async move { + match message_or_error { + Err(error) => { + error!("Error reading message: {}", error); + } + Ok(message) => { + let instance_connect_msg = match message { + Message::Text(str) => { + let parsed: serde_json::Result = + serde_json::from_str(&str); + + match parsed { + Ok(parsed) => Some(parsed), + Err(err) => { + error!("Failed to parse worker connect message: {err}"); + None + } } - None => { - print!("Connection Closed"); + } + Message::Binary(data) => { + let parsed: serde_json::Result = + serde_json::from_slice(&data); + match parsed { + Ok(parsed) => Some(parsed), + Err(err) => { + error!("Failed to parse worker connect message: {err}"); + None + } } } - None - } - Message::Frame(_) => { - info!("Ignore unexpected frame"); - None - } - }; - - match instance_connect_msg { - None => {} - Some(msg) => match msg.event { - WorkerEvent::Stdout(StdOutLog { message }) => { - print!("{message}") + Message::Ping(_) => { + trace!("Ignore ping"); + None } - WorkerEvent::Stderr(StdErrLog { message }) => { - print!("{message}") + Message::Pong(_) => { + trace!("Ignore pong"); + None } - WorkerEvent::Log(Log { - level, - context, - message, - }) => match level { - 0 => tracing::trace!(message, context = context), - 1 => tracing::debug!(message, context = context), - 2 => tracing::info!(message, context = context), - 3 => tracing::warn!(message, context = context), - _ => tracing::error!(message, context = context), + Message::Close(details) => { + match details { + Some(closed_frame) => { + info!("Connection Closed: {}", closed_frame); + } + None => { + info!("Connection Closed"); + } + } + None + } + Message::Frame(f) => { + debug!("Ignored unexpected frame {f:?}"); + None + } + }; + + match instance_connect_msg { + None => {} + Some(msg) => match msg { + WorkerEvent::StdOut { timestamp, bytes } => { + output + .emit_stdout( + timestamp, + String::from_utf8_lossy(&bytes).to_string(), + ) + .await; + } + WorkerEvent::StdErr { timestamp, bytes } => { + output + .emit_stderr( + timestamp, + String::from_utf8_lossy(&bytes).to_string(), + ) + .await; + } + WorkerEvent::Log { + timestamp, + level, + context, + message, + } => { + output.emit_log(timestamp, level, context, message); + } + WorkerEvent::Close => {} }, - }, + } } } } @@ -375,7 +414,6 @@ impl WorkerClient for WorkerCl pin_mut!(read_res, pings); future::select(pings, read_res).await; - Ok(()) } @@ -406,35 +444,6 @@ impl WorkerClient for WorkerCl } } -#[derive(Deserialize, Debug)] -struct InstanceConnectMessage { - pub event: WorkerEvent, -} - -#[derive(Deserialize, Debug)] -enum WorkerEvent { - Stdout(StdOutLog), - Stderr(StdErrLog), - Log(Log), -} - -#[derive(Deserialize, Debug)] -struct StdOutLog { - message: String, -} - -#[derive(Deserialize, Debug)] -struct StdErrLog { - message: String, -} - -#[derive(Deserialize, Debug)] -struct Log { - pub level: i32, - pub context: String, - pub message: String, -} - fn get_worker_golem_error(status: u16, body: Vec) -> GolemError { let error: Result, serde_json::Error> = match status { 400 => serde_json::from_slice(&body).map(|body| Error::Item(WorkerError::Error400(body))), diff --git a/golem-cli/src/oss/factory.rs b/golem-cli/src/oss/factory.rs index 453682f819..2131bdcab5 100644 --- a/golem-cli/src/oss/factory.rs +++ b/golem-cli/src/oss/factory.rs @@ -28,6 +28,7 @@ use crate::oss::clients::worker::WorkerClientLive; use crate::oss::model::OssContext; use crate::service::project::{ProjectResolver, ProjectResolverOss}; use golem_client::Context; +use std::sync::Arc; use url::Url; #[derive(Debug, Clone)] @@ -109,10 +110,10 @@ impl ServiceFactory for OssServiceFactory { &self, _auth: &Self::SecurityContext, ) -> Result< - Box + Send + Sync>, + Arc + Send + Sync>, GolemError, > { - Ok(Box::new(ProjectResolverOss::DUMMY)) + Ok(Arc::new(ProjectResolverOss::DUMMY)) } fn component_client( diff --git a/golem-cli/src/oss/main.rs b/golem-cli/src/oss/main.rs index 8deac2efe4..908000380c 100644 --- a/golem-cli/src/oss/main.rs +++ b/golem-cli/src/oss/main.rs @@ -73,8 +73,8 @@ pub async fn async_main + clap::Args>( subcommand .handle( format, - factory.worker_service(ctx)?.as_ref(), - factory.project_resolver(ctx)?.as_ref(), + factory.worker_service(ctx)?, + factory.project_resolver(ctx)?, ) .await } diff --git a/golem-cli/src/service/worker.rs b/golem-cli/src/service/worker.rs index 9b288aba35..2707c85cd9 100644 --- a/golem-cli/src/service/worker.rs +++ b/golem-cli/src/service/worker.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::clients::worker::WorkerClient; +use crate::command::worker::WorkerConnectOptions; use crate::model::component::{function_params_types, show_exported_function, Component}; use crate::model::conversions::{ analysed_type_client_to_model, decode_type_annotated_value_json, @@ -89,6 +90,8 @@ pub trait WorkerService { &self, worker_uri: WorkerUri, project: Option, + connect_options: WorkerConnectOptions, + format: Format, ) -> Result; async fn interrupt( &self, @@ -540,10 +543,14 @@ impl WorkerService for WorkerServiceLive< &self, worker_uri: WorkerUri, project: Option, + connect_options: WorkerConnectOptions, + format: Format, ) -> Result { let worker_urn = self.resolve_uri(worker_uri, project).await?; - self.client.connect(worker_urn).await?; + self.client + .connect_forever(worker_urn, connect_options, format) + .await?; Err(GolemError("Unexpected connection closure".to_string())) } diff --git a/golem-cli/tests/worker.rs b/golem-cli/tests/worker.rs index ff5e418592..6b305d390d 100644 --- a/golem-cli/tests/worker.rs +++ b/golem-cli/tests/worker.rs @@ -634,8 +634,17 @@ fn worker_connect( let _ = cli.run_json(&cli_args)?; let line = rx.recv_timeout(Duration::from_secs(5))?; + let json: serde_json::Value = serde_json::from_str(&line)?; - assert_eq!(line, "Sample text written to the output"); + assert_eq!( + json.as_object() + .unwrap() + .get("message") + .unwrap() + .as_str() + .unwrap(), + "Sample text written to the output" + ); child.kill()?; diff --git a/golem-common/src/model/mod.rs b/golem-common/src/model/mod.rs index 056f364b6b..01ecca39a5 100644 --- a/golem-common/src/model/mod.rs +++ b/golem-common/src/model/mod.rs @@ -20,6 +20,12 @@ use std::ops::Add; use std::str::FromStr; use std::time::Duration; +use crate::config::RetryConfig; +use crate::model::oplog::{ + IndexedResourceKey, OplogEntry, OplogIndex, TimestampedUpdateDescription, WorkerResourceId, +}; +use crate::model::regions::DeletedRegions; +use crate::newtype_uuid; use bincode::de::read::Reader; use bincode::de::{BorrowDecoder, Decoder}; use bincode::enc::write::Writer; @@ -27,6 +33,7 @@ use bincode::enc::Encoder; use bincode::error::{DecodeError, EncodeError}; use bincode::{BorrowDecode, Decode, Encode}; use derive_more::FromStr; +use golem_api_grpc::proto::golem; use golem_api_grpc::proto::golem::worker::Cursor; use poem::http::Uri; use poem_openapi::registry::{MetaSchema, MetaSchemaRef}; @@ -37,13 +44,6 @@ use serde::{Deserialize, Serialize, Serializer}; use serde_json::Value; use uuid::Uuid; -use crate::config::RetryConfig; -use crate::model::oplog::{ - IndexedResourceKey, OplogIndex, TimestampedUpdateDescription, WorkerResourceId, -}; -use crate::model::regions::DeletedRegions; -use crate::newtype_uuid; - pub mod component_metadata; pub mod exports; pub mod oplog; @@ -1865,6 +1865,236 @@ impl From for Cursor { } } +#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode, Serialize, Deserialize)] +pub enum LogLevel { + Trace, + Debug, + Info, + Warn, + Error, + Critical, +} + +impl From for LogLevel { + fn from(value: golem_api_grpc::proto::golem::worker::Level) -> Self { + match value { + golem_api_grpc::proto::golem::worker::Level::Trace => LogLevel::Trace, + golem_api_grpc::proto::golem::worker::Level::Debug => LogLevel::Debug, + golem_api_grpc::proto::golem::worker::Level::Info => LogLevel::Info, + golem_api_grpc::proto::golem::worker::Level::Warn => LogLevel::Warn, + golem_api_grpc::proto::golem::worker::Level::Error => LogLevel::Error, + golem_api_grpc::proto::golem::worker::Level::Critical => LogLevel::Critical, + } + } +} + +impl From for golem_api_grpc::proto::golem::worker::Level { + fn from(value: LogLevel) -> Self { + match value { + LogLevel::Trace => golem_api_grpc::proto::golem::worker::Level::Trace, + LogLevel::Debug => golem_api_grpc::proto::golem::worker::Level::Debug, + LogLevel::Info => golem_api_grpc::proto::golem::worker::Level::Info, + LogLevel::Warn => golem_api_grpc::proto::golem::worker::Level::Warn, + LogLevel::Error => golem_api_grpc::proto::golem::worker::Level::Error, + LogLevel::Critical => golem_api_grpc::proto::golem::worker::Level::Critical, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode, Serialize, Deserialize)] +pub enum WorkerEvent { + StdOut { + timestamp: Timestamp, + bytes: Vec, + }, + StdErr { + timestamp: Timestamp, + bytes: Vec, + }, + Log { + timestamp: Timestamp, + level: LogLevel, + context: String, + message: String, + }, + Close, +} + +impl WorkerEvent { + pub fn stdout(bytes: Vec) -> WorkerEvent { + WorkerEvent::StdOut { + timestamp: Timestamp::now_utc(), + bytes, + } + } + + pub fn stderr(bytes: Vec) -> WorkerEvent { + WorkerEvent::StdErr { + timestamp: Timestamp::now_utc(), + bytes, + } + } + + pub fn log(level: LogLevel, context: &str, message: &str) -> WorkerEvent { + WorkerEvent::Log { + timestamp: Timestamp::now_utc(), + level, + context: context.to_string(), + message: message.to_string(), + } + } + + pub fn as_oplog_entry(&self) -> Option { + match self { + WorkerEvent::StdOut { timestamp, bytes } => Some(OplogEntry::Log { + timestamp: *timestamp, + level: oplog::LogLevel::Stdout, + context: String::new(), + message: String::from_utf8_lossy(bytes).to_string(), + }), + WorkerEvent::StdErr { timestamp, bytes } => Some(OplogEntry::Log { + timestamp: *timestamp, + level: oplog::LogLevel::Stdout, + context: String::new(), + message: String::from_utf8_lossy(bytes).to_string(), + }), + WorkerEvent::Log { + timestamp, + level, + context, + message, + } => Some(OplogEntry::Log { + timestamp: *timestamp, + level: match level { + LogLevel::Trace => oplog::LogLevel::Trace, + LogLevel::Debug => oplog::LogLevel::Debug, + LogLevel::Info => oplog::LogLevel::Info, + LogLevel::Warn => oplog::LogLevel::Warn, + LogLevel::Error => oplog::LogLevel::Error, + LogLevel::Critical => oplog::LogLevel::Critical, + }, + context: context.clone(), + message: message.clone(), + }), + WorkerEvent::Close => None, + } + } +} + +impl Display for WorkerEvent { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + WorkerEvent::StdOut { bytes, .. } => { + write!( + f, + " {}", + String::from_utf8(bytes.clone()).unwrap_or_default() + ) + } + WorkerEvent::StdErr { bytes, .. } => { + write!( + f, + " {}", + String::from_utf8(bytes.clone()).unwrap_or_default() + ) + } + WorkerEvent::Log { + level, + context, + message, + .. + } => { + write!(f, " {:?} {} {}", level, context, message) + } + WorkerEvent::Close => { + write!(f, "") + } + } + } +} + +impl TryFrom for WorkerEvent { + type Error = String; + + fn try_from( + value: golem_api_grpc::proto::golem::worker::LogEvent, + ) -> Result { + match value.event { + Some(event) => match event { + golem_api_grpc::proto::golem::worker::log_event::Event::Stdout(event) => { + Ok(WorkerEvent::StdOut { + timestamp: event.timestamp.ok_or("Missing timestamp")?.into(), + bytes: event.message.into_bytes(), + }) + } + golem_api_grpc::proto::golem::worker::log_event::Event::Stderr(event) => { + Ok(WorkerEvent::StdErr { + timestamp: event.timestamp.ok_or("Missing timestamp")?.into(), + bytes: event.message.into_bytes(), + }) + } + golem_api_grpc::proto::golem::worker::log_event::Event::Log(event) => { + Ok(WorkerEvent::Log { + timestamp: event.timestamp.clone().ok_or("Missing timestamp")?.into(), + level: event.level().into(), + context: event.context, + message: event.message, + }) + } + }, + None => Err("Missing event".to_string()), + } + } +} + +impl TryFrom for golem_api_grpc::proto::golem::worker::LogEvent { + type Error = String; + + fn try_from(value: WorkerEvent) -> Result { + match value { + WorkerEvent::StdOut { timestamp, bytes } => Ok(golem::worker::LogEvent { + event: Some(golem::worker::log_event::Event::Stdout( + golem::worker::StdOutLog { + message: String::from_utf8_lossy(&bytes).to_string(), + timestamp: Some(timestamp.into()), + }, + )), + }), + WorkerEvent::StdErr { timestamp, bytes } => Ok(golem::worker::LogEvent { + event: Some( + golem_api_grpc::proto::golem::worker::log_event::Event::Stderr( + golem::worker::StdErrLog { + message: String::from_utf8_lossy(&bytes).to_string(), + timestamp: Some(timestamp.into()), + }, + ), + ), + }), + WorkerEvent::Log { + timestamp, + level, + context, + message, + } => Ok(golem::worker::LogEvent { + event: Some(golem::worker::log_event::Event::Log(golem::worker::Log { + level: match level { + LogLevel::Trace => golem::worker::Level::Trace.into(), + LogLevel::Debug => golem::worker::Level::Debug.into(), + LogLevel::Info => golem::worker::Level::Info.into(), + LogLevel::Warn => golem::worker::Level::Warn.into(), + LogLevel::Error => golem::worker::Level::Error.into(), + LogLevel::Critical => golem::worker::Level::Critical.into(), + }, + context, + message, + timestamp: Some(timestamp.into()), + })), + }), + WorkerEvent::Close => Err("Close event is not supported via protobuf".to_string()), + } + } +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/golem-test-framework/src/dsl/mod.rs b/golem-test-framework/src/dsl/mod.rs index deebb61f31..739ce2ba57 100644 --- a/golem-test-framework/src/dsl/mod.rs +++ b/golem-test-framework/src/dsl/mod.rs @@ -745,17 +745,31 @@ impl TestDsl for T { } } -pub fn stdout_event(s: &str) -> LogEvent { - LogEvent { - event: Some(log_event::Event::Stdout(StdOutLog { - message: s.to_string(), - })), +pub fn stdout_events(events: impl Iterator) -> Vec { + events + .flat_map(|event| match event { + LogEvent { + event: Some(log_event::Event::Stdout(StdOutLog { message, .. })), + } => Some(message), + _ => None, + }) + .collect() +} + +pub fn stdout_event_matching(event: &LogEvent, s: &str) -> bool { + if let LogEvent { + event: Some(log_event::Event::Stdout(StdOutLog { message, .. })), + } = event + { + message == s + } else { + false } } pub fn stdout_event_starting_with(event: &LogEvent, s: &str) -> bool { if let LogEvent { - event: Some(log_event::Event::Stdout(StdOutLog { message })), + event: Some(log_event::Event::Stdout(StdOutLog { message, .. })), } = event { message.starts_with(s) @@ -764,12 +778,15 @@ pub fn stdout_event_starting_with(event: &LogEvent, s: &str) -> bool { } } -pub fn stderr_event(s: &str) -> LogEvent { - LogEvent { - event: Some(log_event::Event::Stderr(StdErrLog { - message: s.to_string(), - })), - } +pub fn stderr_events(events: impl Iterator) -> Vec { + events + .flat_map(|event| match event { + LogEvent { + event: Some(log_event::Event::Stderr(StdErrLog { message, .. })), + } => Some(message), + _ => None, + }) + .collect() } pub fn log_event_to_string(event: &LogEvent) -> String { diff --git a/golem-worker-executor-base/Cargo.toml b/golem-worker-executor-base/Cargo.toml index dbf0055fda..c7727c0dcb 100644 --- a/golem-worker-executor-base/Cargo.toml +++ b/golem-worker-executor-base/Cargo.toml @@ -65,7 +65,7 @@ nonempty-collections = "0.2.5" prometheus = { workspace = true } prost = { workspace = true } rand = { workspace = true } -ringbuf = "0.3.3" +ringbuf = "0.4.1" rustls = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/golem-worker-executor-base/src/durable_host/io/streams.rs b/golem-worker-executor-base/src/durable_host/io/streams.rs index d5e0a86c6a..32b4d97d1c 100644 --- a/golem-worker-executor-base/src/durable_host/io/streams.rs +++ b/golem-worker-executor-base/src/durable_host/io/streams.rs @@ -23,9 +23,9 @@ use crate::durable_host::serialized::SerializableStreamError; use crate::durable_host::{Durability, DurableWorkerCtx, HttpRequestCloseOwner}; use crate::error::GolemError; use crate::metrics::wasm::record_host_function_call; -use crate::services::worker_event::WorkerEvent; use crate::workerctx::WorkerCtx; use golem_common::model::oplog::{OplogIndex, WrappedFunctionType}; +use golem_common::model::WorkerEvent; use wasmtime_wasi::bindings::io::streams::{ Host, HostInputStream, HostOutputStream, InputStream, OutputStream, Pollable, }; @@ -183,9 +183,9 @@ impl HostOutputStream for DurableWorkerCtx { let output = self.table().get(&self_)?; let event = if output.as_any().downcast_ref::().is_some() { - Some(WorkerEvent::StdOut(contents.clone())) + Some(WorkerEvent::stdout(contents.clone())) } else if output.as_any().downcast_ref::().is_some() { - Some(WorkerEvent::StdErr(contents.clone())) + Some(WorkerEvent::stderr(contents.clone())) } else { None }; diff --git a/golem-worker-executor-base/src/durable_host/logging/logging.rs b/golem-worker-executor-base/src/durable_host/logging/logging.rs index 773303e630..04fdd56162 100644 --- a/golem-worker-executor-base/src/durable_host/logging/logging.rs +++ b/golem-worker-executor-base/src/durable_host/logging/logging.rs @@ -15,9 +15,9 @@ use crate::durable_host::DurableWorkerCtx; use crate::metrics::wasm::record_host_function_call; use crate::preview2::wasi::logging::logging::{Host, Level}; -use crate::services::worker_event::{LogLevel, WorkerEvent}; use crate::workerctx::WorkerCtx; use async_trait::async_trait; +use golem_common::model::{LogLevel, WorkerEvent}; #[async_trait] impl Host for DurableWorkerCtx { @@ -33,11 +33,7 @@ impl Host for DurableWorkerCtx { Level::Debug => LogLevel::Debug, Level::Trace => LogLevel::Trace, }; - let event = WorkerEvent::Log { - level: log_level, - context, - message, - }; + let event = WorkerEvent::log(log_level, &context, &message); self.emit_log_event(event).await; Ok(()) } diff --git a/golem-worker-executor-base/src/durable_host/mod.rs b/golem-worker-executor-base/src/durable_host/mod.rs index 75bb456dc3..19f8505ab2 100644 --- a/golem-worker-executor-base/src/durable_host/mod.rs +++ b/golem-worker-executor-base/src/durable_host/mod.rs @@ -33,7 +33,7 @@ use crate::services::golem_config::GolemConfig; use crate::services::key_value::KeyValueService; use crate::services::promise::PromiseService; use crate::services::worker::WorkerService; -use crate::services::worker_event::{WorkerEvent, WorkerEventService}; +use crate::services::worker_event::WorkerEventService; use crate::services::{worker_enumeration, HasAll, HasConfig, HasOplog, HasWorker}; use crate::workerctx::{ ExternalOperations, IndexedResourceStore, InvocationHooks, InvocationManagement, @@ -50,8 +50,8 @@ use golem_common::model::oplog::{ use golem_common::model::regions::{DeletedRegions, OplogRegion}; use golem_common::model::{ AccountId, ComponentId, ComponentVersion, FailedUpdateRecord, IdempotencyKey, OwnedWorkerId, - ScanCursor, ScheduledAction, SuccessfulUpdateRecord, Timestamp, WorkerFilter, WorkerId, - WorkerMetadata, WorkerResourceDescription, WorkerStatus, WorkerStatusRecord, + ScanCursor, ScheduledAction, SuccessfulUpdateRecord, Timestamp, WorkerEvent, WorkerFilter, + WorkerId, WorkerMetadata, WorkerResourceDescription, WorkerStatus, WorkerStatusRecord, }; use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; use golem_wasm_rpc::wasmtime::ResourceStore; diff --git a/golem-worker-executor-base/src/grpc.rs b/golem-worker-executor-base/src/grpc.rs index 9fd6346d23..b751bc0c44 100644 --- a/golem-worker-executor-base/src/grpc.rs +++ b/golem-worker-executor-base/src/grpc.rs @@ -12,17 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use futures_util::Stream; +use gethostname::gethostname; +use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; +use golem_wasm_rpc::protobuf::Val; use std::collections::HashMap; use std::fmt::{Debug, Display, Formatter}; use std::marker::PhantomData; +use std::pin::Pin; use std::sync::Arc; - -use gethostname::gethostname; -use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; -use golem_wasm_rpc::protobuf::Val; +use std::task::{Context, Poll}; use tokio::sync::broadcast::error::RecvError; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use tonic::{Request, Response, Status}; use tracing::{debug, error, info, warn, Instrument}; use uuid::Uuid; @@ -43,23 +44,21 @@ use golem_common::grpc::{ proto_account_id_string, proto_component_id_string, proto_idempotency_key_string, proto_promise_id_string, proto_worker_id_string, }; -use golem_common::metrics::api::{ - record_closed_grpc_api_active_stream, record_new_grpc_api_active_stream, -}; +use golem_common::metrics::api::record_new_grpc_api_active_stream; use golem_common::model::oplog::UpdateDescription; use golem_common::model::{ AccountId, ComponentId, IdempotencyKey, OwnedWorkerId, ScanCursor, ShardId, - TimestampedWorkerInvocation, WorkerFilter, WorkerId, WorkerInvocation, WorkerMetadata, - WorkerStatus, WorkerStatusRecord, + TimestampedWorkerInvocation, WorkerEvent, WorkerFilter, WorkerId, WorkerInvocation, + WorkerMetadata, WorkerStatus, WorkerStatusRecord, }; use golem_common::{model as common_model, recorded_grpc_api_request}; use crate::model::{InterruptKind, LastError}; use crate::services::events::Event; use crate::services::worker_activator::{DefaultWorkerActivator, LazyWorkerActivator}; -use crate::services::worker_event::LogLevel; +use crate::services::worker_event::WorkerEventReceiver; use crate::services::{ - worker_event, All, HasActiveWorkers, HasAll, HasEvents, HasPromiseService, + All, HasActiveWorkers, HasAll, HasEvents, HasPromiseService, HasRunningWorkerEnumerationService, HasShardManagerService, HasShardService, HasWorkerEnumerationService, HasWorkerService, UsesAllDeps, }; @@ -146,7 +145,7 @@ impl + UsesAllDeps + Send + Sync + } type ResponseResult = Result, Status>; -type ResponseStream = ReceiverStream>; +type ResponseStream = WorkerEventStream; impl + UsesAllDeps + Send + Sync + 'static> WorkerExecutorImpl @@ -224,7 +223,7 @@ impl + UsesAllDeps + Send + Sync + fn ensure_worker_belongs_to_this_executor( &self, - worker_id: &common_model::WorkerId, + worker_id: &WorkerId, ) -> Result<(), GolemError> { self.shard_service().check_worker(worker_id) } @@ -990,116 +989,12 @@ impl + UsesAllDeps + Send + Sync + .await? .event_service(); - let mut receiver = event_service.receiver(); + let receiver = event_service.receiver(); info!("Client connected"); record_new_grpc_api_active_stream(); - // spawn and channel are required if you want handle "disconnect" functionality - // the `out_stream` will not be polled after client disconnect - let (tx, rx) = mpsc::channel(128); - - tokio::spawn( - async move { - while let Ok(item) = receiver.recv().await { - match item { - worker_event::WorkerEvent::Close => { - break; - } - worker_event::WorkerEvent::StdOut(line) => { - match tx - .send(Result::<_, Status>::Ok(golem::worker::LogEvent { - event: Some(golem::worker::log_event::Event::Stdout( - golem::worker::StdOutLog { - message: String::from_utf8(line).unwrap(), - }, - )), - })) - .await - { - Ok(_) => { - // item (server response) was queued to be send to client - } - Err(_item) => { - // output_stream was build from rx and both are dropped - break; - } - } - } - worker_event::WorkerEvent::StdErr(line) => { - match tx - .send(Result::<_, Status>::Ok(golem::worker::LogEvent { - event: Some(golem::worker::log_event::Event::Stderr( - golem::worker::StdErrLog { - message: String::from_utf8(line).unwrap(), - }, - )), - })) - .await - { - Ok(_) => { - // item (server response) was queued to be send to client - } - Err(_item) => { - // output_stream was build from rx and both are dropped - break; - } - } - } - worker_event::WorkerEvent::Log { - level, - context, - message, - } => match tx - .send(Result::<_, Status>::Ok(golem::worker::LogEvent { - event: Some(golem::worker::log_event::Event::Log( - golem::worker::Log { - level: match level { - LogLevel::Trace => { - golem::worker::Level::Trace.into() - } - LogLevel::Debug => { - golem::worker::Level::Debug.into() - } - LogLevel::Info => { - golem::worker::Level::Info.into() - } - LogLevel::Warn => { - golem::worker::Level::Warn.into() - } - LogLevel::Error => { - golem::worker::Level::Error.into() - } - LogLevel::Critical => { - golem::worker::Level::Critical.into() - } - }, - context, - message, - }, - )), - })) - .await - { - Ok(_) => { - // item (server response) was queued to be send to client - } - Err(_item) => { - // output_stream was build from rx and both are dropped - break; - } - }, - } - } - - record_closed_grpc_api_active_stream(); - info!("Client disconnected"); - } - .in_current_span(), - ); - - let output_stream = ReceiverStream::new(rx); - Ok(Response::new(output_stream)) + Ok(Response::new(WorkerEventStream::new(receiver))) } else { // We don't want 'connect' to resume interrupted workers Err(GolemError::Interrupted { @@ -1927,3 +1822,42 @@ pub fn authorised_grpc_request(request: T, access_token: &Uuid) -> Request ); req } + +pub struct WorkerEventStream { + inner: Pin> + Send>>, +} + +impl WorkerEventStream { + pub fn new(receiver: WorkerEventReceiver) -> Self { + WorkerEventStream { + inner: Box::pin(receiver.to_stream()), + } + } +} + +impl Drop for WorkerEventStream { + fn drop(&mut self) { + info!("Client disconnected"); + } +} + +impl Stream for WorkerEventStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let WorkerEventStream { inner } = self.get_mut(); + match inner.as_mut().poll_next(cx) { + Poll::Ready(Some(Ok(event))) => match &event { + WorkerEvent::Close => Poll::Ready(None), + WorkerEvent::StdOut { .. } => Poll::Ready(Some(Ok(event.try_into().unwrap()))), + WorkerEvent::StdErr { .. } => Poll::Ready(Some(Ok(event.try_into().unwrap()))), + WorkerEvent::Log { .. } => Poll::Ready(Some(Ok(event.try_into().unwrap()))), + }, + Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) => Poll::Ready(Some(Err( + Status::data_loss(format!("Lagged by {} events", n)), + ))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/golem-worker-executor-base/src/services/worker_event.rs b/golem-worker-executor-base/src/services/worker_event.rs index 25ae870f98..f532c831f4 100644 --- a/golem-worker-executor-base/src/services/worker_event.rs +++ b/golem-worker-executor-base/src/services/worker_event.rs @@ -13,118 +13,33 @@ // limitations under the License. use crate::metrics::events::{record_broadcast_event, record_event}; -use bincode::{Decode, Encode}; -use golem_common::model::oplog::OplogEntry; +use futures_util::{stream, StreamExt}; +use golem_common::model::{LogLevel, WorkerEvent}; +use ringbuf::storage::Heap; +use ringbuf::traits::{Consumer, Producer, Split}; use ringbuf::*; -use std::fmt::{Display, Formatter}; +use std::sync::{Arc, Mutex}; use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::*; -#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)] -pub enum LogLevel { - Trace, - Debug, - Info, - Warn, - Error, - Critical, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)] -pub enum WorkerEvent { - StdOut(Vec), - StdErr(Vec), - Log { - level: LogLevel, - context: String, - message: String, - }, - Close, -} - -impl WorkerEvent { - pub fn as_oplog_entry(&self) -> Option { - match self { - WorkerEvent::StdOut(bytes) => Some(OplogEntry::log( - golem_common::model::oplog::LogLevel::Stdout, - String::new(), - String::from_utf8_lossy(bytes).to_string(), - )), - WorkerEvent::StdErr(bytes) => Some(OplogEntry::log( - golem_common::model::oplog::LogLevel::Stderr, - String::new(), - String::from_utf8_lossy(bytes).to_string(), - )), - WorkerEvent::Log { - level, - context, - message, - } => Some(OplogEntry::log( - match level { - LogLevel::Trace => golem_common::model::oplog::LogLevel::Trace, - LogLevel::Debug => golem_common::model::oplog::LogLevel::Debug, - LogLevel::Info => golem_common::model::oplog::LogLevel::Info, - LogLevel::Warn => golem_common::model::oplog::LogLevel::Warn, - LogLevel::Error => golem_common::model::oplog::LogLevel::Error, - LogLevel::Critical => golem_common::model::oplog::LogLevel::Critical, - }, - context.clone(), - message.clone(), - )), - WorkerEvent::Close => None, - } - } -} - -impl Display for WorkerEvent { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - WorkerEvent::StdOut(bytes) => { - write!( - f, - " {}", - String::from_utf8(bytes.clone()).unwrap_or_default() - ) - } - WorkerEvent::StdErr(bytes) => { - write!( - f, - " {}", - String::from_utf8(bytes.clone()).unwrap_or_default() - ) - } - WorkerEvent::Log { - level, - context, - message, - } => { - write!(f, " {:?} {} {}", level, context, message) - } - WorkerEvent::Close => { - write!(f, "") - } - } - } -} +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; +use tokio_stream::wrappers::BroadcastStream; +use tokio_stream::Stream; /// Per-worker event stream pub trait WorkerEventService { fn emit_event(&self, event: WorkerEvent); - fn emit_stdout(&self, data: Vec) { - self.emit_event(WorkerEvent::StdOut(data)) + fn emit_stdout(&self, bytes: Vec) { + self.emit_event(WorkerEvent::stdout(bytes)) } - fn emit_stderr(&self, data: Vec) { - self.emit_event(WorkerEvent::StdErr(data)) + fn emit_stderr(&self, bytes: Vec) { + self.emit_event(WorkerEvent::stderr(bytes)) } fn emit_log(&self, log_level: LogLevel, context: &str, message: &str) { - self.emit_event(WorkerEvent::Log { - level: log_level, - context: context.to_string(), - message: message.to_string(), - }) + self.emit_event(WorkerEvent::log(log_level, context, message)) } fn receiver(&self) -> WorkerEventReceiver; @@ -142,19 +57,28 @@ impl WorkerEventReceiver { None => self.receiver.recv().await, } } + + pub fn to_stream(self) -> impl Stream> { + let Self { history, receiver } = self; + stream::iter(history.into_iter().map(Ok)).chain(BroadcastStream::new(receiver)) + } } pub struct WorkerEventServiceDefault { sender: Sender, - ring: HeapRb, + ring_prod: Arc> as Split>::Prod>>, + ring_cons: Arc> as Split>::Cons>>, } impl WorkerEventServiceDefault { pub fn new(channel_capacity: usize, ring_capacity: usize) -> WorkerEventServiceDefault { let (tx, _) = channel(channel_capacity); - let ring = HeapRb::new(ring_capacity); - // ring.sub - WorkerEventServiceDefault { sender: tx, ring } + let (ring_prod, ring_cons) = HeapRb::new(ring_capacity).split(); + WorkerEventServiceDefault { + sender: tx, + ring_prod: Arc::new(Mutex::new(ring_prod)), + ring_cons: Arc::new(Mutex::new(ring_cons)), + } } } @@ -173,20 +97,26 @@ impl WorkerEventService for WorkerEventServiceDefault { let _ = self.sender.send(event.clone()); } - let _ = unsafe { Producer::new(&self.ring) }.push(event); + + let mut ring_prod = self.ring_prod.lock().unwrap(); + while ring_prod.try_push(event.clone()).is_err() { + let mut ring_cons = self.ring_cons.lock().unwrap(); + let _ = ring_cons.try_pop(); + } } fn receiver(&self) -> WorkerEventReceiver { let receiver = self.sender.subscribe(); - let history = self.ring.iter().cloned().rev().collect(); + let ring_cons = self.ring_cons.lock().unwrap(); + let history = ring_cons.iter().cloned().collect(); WorkerEventReceiver { history, receiver } } } fn label(event: &WorkerEvent) -> &'static str { match event { - WorkerEvent::StdOut(_) => "stdout", - WorkerEvent::StdErr(_) => "stderr", + WorkerEvent::StdOut { .. } => "stdout", + WorkerEvent::StdErr { .. } => "stderr", WorkerEvent::Log { .. } => "log", WorkerEvent::Close => "close", } @@ -224,7 +154,7 @@ mod tests { }); for b in 1..5u8 { - svc.emit_event(WorkerEvent::StdOut(vec![b])); + svc.emit_event(WorkerEvent::stdout(vec![b])); } let svc2 = svc.clone(); @@ -243,7 +173,7 @@ mod tests { }); for b in 5..9u8 { - svc.emit_event(WorkerEvent::StdOut(vec![b])); + svc.emit_event(WorkerEvent::stdout(vec![b])); } drop(svc); @@ -257,23 +187,23 @@ mod tests { assert_eq!( result1 == vec![ - WorkerEvent::StdOut(vec![1]), - WorkerEvent::StdOut(vec![2]), - WorkerEvent::StdOut(vec![3]), - WorkerEvent::StdOut(vec![5]), - WorkerEvent::StdOut(vec![6]), - WorkerEvent::StdOut(vec![7]), - WorkerEvent::StdOut(vec![8]), + WorkerEvent::stdout(vec![1]), + WorkerEvent::stdout(vec![2]), + WorkerEvent::stdout(vec![3]), + WorkerEvent::stdout(vec![5]), + WorkerEvent::stdout(vec![6]), + WorkerEvent::stdout(vec![7]), + WorkerEvent::stdout(vec![8]), ], result2 == vec![ - WorkerEvent::StdOut(vec![1]), - WorkerEvent::StdOut(vec![2]), - WorkerEvent::StdOut(vec![3]), - WorkerEvent::StdOut(vec![5]), - WorkerEvent::StdOut(vec![6]), - WorkerEvent::StdOut(vec![7]), - WorkerEvent::StdOut(vec![8]), + WorkerEvent::stdout(vec![1]), + WorkerEvent::stdout(vec![2]), + WorkerEvent::stdout(vec![3]), + WorkerEvent::stdout(vec![5]), + WorkerEvent::stdout(vec![6]), + WorkerEvent::stdout(vec![7]), + WorkerEvent::stdout(vec![8]), ] ) } @@ -301,7 +231,7 @@ mod tests { for b in 1..1001 { let s = format!("{}", b); - svc.emit_event(WorkerEvent::StdOut(s.as_bytes().into())); + svc.emit_event(WorkerEvent::stdout(s.as_bytes().into())); } let svc2 = svc.clone(); @@ -321,7 +251,7 @@ mod tests { for b in 1001..1005 { let s = format!("{}", b); - svc.emit_event(WorkerEvent::StdOut(s.as_bytes().into())); + svc.emit_event(WorkerEvent::stdout(s.as_bytes().into())); } drop(svc); @@ -336,14 +266,14 @@ mod tests { result1.len() == 1004, result2 == vec![ - WorkerEvent::StdOut("997".as_bytes().into()), - WorkerEvent::StdOut("998".as_bytes().into()), - WorkerEvent::StdOut("999".as_bytes().into()), - WorkerEvent::StdOut("1000".as_bytes().into()), - WorkerEvent::StdOut("1001".as_bytes().into()), - WorkerEvent::StdOut("1002".as_bytes().into()), - WorkerEvent::StdOut("1003".as_bytes().into()), - WorkerEvent::StdOut("1004".as_bytes().into()), + WorkerEvent::stdout("997".as_bytes().into()), + WorkerEvent::stdout("998".as_bytes().into()), + WorkerEvent::stdout("999".as_bytes().into()), + WorkerEvent::stdout("1000".as_bytes().into()), + WorkerEvent::stdout("1001".as_bytes().into()), + WorkerEvent::stdout("1002".as_bytes().into()), + WorkerEvent::stdout("1003".as_bytes().into()), + WorkerEvent::stdout("1004".as_bytes().into()), ] ) } diff --git a/golem-worker-executor-base/tests/api.rs b/golem-worker-executor-base/tests/api.rs index 6cff9f2f14..f29c540e15 100644 --- a/golem-worker-executor-base/tests/api.rs +++ b/golem-worker-executor-base/tests/api.rs @@ -39,7 +39,8 @@ use crate::common::{start, TestContext, TestWorkerExecutor}; use golem_common::model::oplog::{IndexedResourceKey, OplogIndex, WorkerResourceId}; use golem_test_framework::config::TestDependencies; use golem_test_framework::dsl::{ - drain_connection, is_worker_execution_error, stdout_event, worker_error_message, TestDslUnsafe, + drain_connection, is_worker_execution_error, stdout_event_matching, stdout_events, + worker_error_message, TestDslUnsafe, }; use tokio::time::sleep; use tonic::transport::Body; @@ -118,7 +119,7 @@ async fn simulated_crash() { ); check!(result.is_ok()); check!(result == Ok(vec![Value::String("done".to_string())])); - check!(events == vec![stdout_event("Starting interruption test\n"),]); + check!(stdout_events(events.into_iter()) == vec!["Starting interruption test\n"]); check!(elapsed.as_secs() < 13); } @@ -1598,8 +1599,14 @@ async fn long_running_poll_loop_connection_breaks_on_interrupt() { drop(executor); http_server.abort(); - check!(events.contains(&Some(stdout_event("Calling the poll endpoint\n")))); - check!(events.contains(&Some(stdout_event("Received initial\n")))); + check!(events + .iter() + .flatten() + .any(|e| stdout_event_matching(e, "Calling the poll endpoint\n"))); + check!(events + .iter() + .flatten() + .any(|e| stdout_event_matching(e, "Received initial\n"))); } #[tokio::test] @@ -1748,9 +1755,15 @@ async fn long_running_poll_loop_connection_can_be_restored_after_resume() { check!(status2.last_known_status.status == WorkerStatus::Interrupted); check!(status3.last_known_status.status == WorkerStatus::Running); check!(status4.last_known_status.status == WorkerStatus::Idle); - check!(events.contains(&stdout_event("Calling the poll endpoint\n"))); - check!(events.contains(&stdout_event("Received initial\n"))); - check!(events.contains(&stdout_event("Poll loop finished\n"))); + check!(events + .iter() + .any(|e| stdout_event_matching(e, "Calling the poll endpoint\n"))); + check!(events + .iter() + .any(|e| stdout_event_matching(e, "Received initial\n"))); + check!(events + .iter() + .any(|e| stdout_event_matching(e, "Poll loop finished\n"))); } #[tokio::test] diff --git a/golem-worker-executor-base/tests/transactions.rs b/golem-worker-executor-base/tests/transactions.rs index 9f88410820..cf02519a83 100644 --- a/golem-worker-executor-base/tests/transactions.rs +++ b/golem-worker-executor-base/tests/transactions.rs @@ -16,7 +16,8 @@ use crate::common::{start, TestContext}; use assert2::check; use bytes::Bytes; use golem_test_framework::dsl::{ - drain_connection, stdout_event, stdout_event_starting_with, worker_error_message, TestDslUnsafe, + drain_connection, stdout_event_starting_with, stdout_events, worker_error_message, + TestDslUnsafe, }; use golem_wasm_rpc::Value; use http_02::{Response, StatusCode}; @@ -161,15 +162,15 @@ async fn jump() { check!(result == vec![Value::U64(5)]); check!( - events + stdout_events(events.into_iter().flatten()) == vec![ - Some(stdout_event("started: 0\n")), - Some(stdout_event("second: 2\n")), - Some(stdout_event("second: 2\n")), - Some(stdout_event("third: 3\n")), - Some(stdout_event("fourth: 4\n")), - Some(stdout_event("fourth: 4\n")), - Some(stdout_event("fifth: 5\n")), + "started: 0\n", + "second: 2\n", + "second: 2\n", + "third: 3\n", + "fourth: 4\n", + "fourth: 4\n", + "fifth: 5\n", ] ); } diff --git a/golem-worker-executor-base/tests/wasi.rs b/golem-worker-executor-base/tests/wasi.rs index 9387aed049..735c201e6b 100644 --- a/golem-worker-executor-base/tests/wasi.rs +++ b/golem-worker-executor-base/tests/wasi.rs @@ -22,7 +22,7 @@ use crate::common::{start, TestContext}; use assert2::{assert, check}; use golem_common::model::{IdempotencyKey, WorkerStatus}; use golem_test_framework::dsl::{ - drain_connection, stderr_event, stdout_event, worker_error_message, TestDslUnsafe, + drain_connection, stderr_events, stdout_events, worker_error_message, TestDslUnsafe, }; use golem_wasm_rpc::Value; use http_02::{Response, StatusCode}; @@ -51,7 +51,7 @@ async fn write_stdout() { drop(executor); - check!(events == vec![stdout_event("Sample text written to the output\n")]); + check!(stdout_events(events.into_iter()) == vec!["Sample text written to the output\n"]); } #[tokio::test] @@ -73,7 +73,7 @@ async fn write_stderr() { drop(executor); - check!(events == vec![stderr_event("Sample text written to the error output\n")]); + check!(stderr_events(events.into_iter()) == vec!["Sample text written to the error output\n"]); } #[tokio::test] diff --git a/golem-worker-service-base/src/service/worker/connect_proxy.rs b/golem-worker-service-base/src/service/worker/connect_proxy.rs index 7eaccf5f49..a1b9feeebb 100644 --- a/golem-worker-service-base/src/service/worker/connect_proxy.rs +++ b/golem-worker-service-base/src/service/worker/connect_proxy.rs @@ -18,11 +18,12 @@ use std::{ }; use futures::{Sink, SinkExt, Stream, StreamExt}; -use poem::web::websocket::Message; -use tonic::Status; - use golem_api_grpc::proto::golem::worker::LogEvent; +use golem_common::model::WorkerEvent; use golem_service_base::model::WorkerId; +use poem::web::websocket::Message; +use tonic::Status; +use tracing::{error, info}; /// Proxies a worker connection, listening for either connection to close. Websocket sink will be closed at the end. /// @@ -37,7 +38,7 @@ pub async fn proxy_worker_connection( keep_alive_interval: Duration, max_pong_timeout: Duration, ) -> Result<(), ConnectProxyError> { - tracing::info!("Proxying worker connection"); + info!("Proxying worker connection"); let mut websocket = keep_alive::WebSocketKeepAlive::from_sink_and_stream( websocket_receiver, @@ -53,7 +54,7 @@ pub async fn proxy_worker_connection( websocket_message = websocket.next() => { match websocket_message { Some(Ok(Message::Close(payload))) => { - tracing::info!( + info!( close_code=payload.as_ref().map(|p| u16::from(p.0)), close_message=payload.as_ref().map(|p| &p.1), "Client closed WebSocket connection", @@ -62,12 +63,13 @@ pub async fn proxy_worker_connection( } Some(Err(error)) => { let error: ConnectProxyError = error.into(); - tracing::info!(error=error.to_string(), "Received WebSocket Error"); + info!(error=error.to_string(), "Received WebSocket Error"); break Err(error); }, - Some(Ok(_)) => {} + Some(Ok(_)) => { + } None => { - tracing::info!("WebSocket connection closed"); + info!("WebSocket connection closed"); break Ok(()); } } @@ -76,24 +78,26 @@ pub async fn proxy_worker_connection( worker_message = worker_stream.next() => { if let Some(message) = worker_message { if let Err(error) = forward_worker_message(message, &mut websocket).await { - tracing::info!(error=error.to_string(), "Error forwarding message to WebSocket client"); - break(Err(error)) + info!(error=error.to_string(), "Error forwarding message to WebSocket client"); + break Err(error) + } } else { - tracing::info!("Worker stream ended"); + info!("Worker stream ended"); break Ok(()); } }, } }; + info!("Closing websocket connection"); if let Err(error) = websocket.close().await { - tracing::error!( + error!( error = error.to_string(), "Error closing WebSocket connection" ); } else { - tracing::info!("WebSocket connection successfully closed"); + info!("WebSocket connection successfully closed"); } result @@ -106,7 +110,7 @@ async fn forward_worker_message( where ConnectProxyError: From, { - let message = message?; + let message: WorkerEvent = message?.try_into().map_err(ConnectProxyError::Proto)?; let msg_json = serde_json::to_string(&message)?; socket.send(Message::Text(msg_json)).await?; Ok(()) @@ -120,6 +124,9 @@ pub enum ConnectProxyError { #[error(transparent)] Json(#[from] serde_json::Error), + #[error("Internal protocol error: {0}")] + Proto(String), + #[error(transparent)] Tonic(#[from] tonic::Status), @@ -151,6 +158,7 @@ mod keep_alive { use futures::{Future, Sink, SinkExt, Stream, StreamExt}; use poem::web::websocket::Message; use tokio::time::Instant; + use tracing::debug; pub struct WebSocketKeepAlive { sink: A, @@ -220,36 +228,36 @@ mod keep_alive { if (self.pong_timeout.as_mut().poll(cx).is_ready() || self.pong_timeout.is_elapsed()) && self.last_ping.is_some() { - tracing::debug!("Ping confirmation timed out"); + debug!("Ping confirmation timed out"); return Poll::Ready(Some(Err(KeepAliveError::Timeout))); } if self.ping_interval.poll_tick(cx).is_ready() && self.last_ping.is_none() { - tracing::debug!("Initiating WebSocket Ping"); + debug!("Initiating WebSocket Ping"); let sink_ready = self.sink.poll_ready_unpin(cx).map_err(|e| { - tracing::debug!("Error polling sink readiness"); + debug!("Error polling sink readiness"); KeepAliveError::Sink(e) })?; if sink_ready.is_pending() { - tracing::debug!("Waiting for sink to be ready"); + debug!("Waiting for sink to be ready"); return Poll::Pending; } self.sink .start_send_unpin(Message::Ping(Vec::new())) .map_err(|e| { - tracing::debug!("Error sending WebSocket Ping"); + debug!("Error sending WebSocket Ping"); KeepAliveError::Sink(e) })?; let _ = self.sink.poll_flush_unpin(cx).map_err(|e| { - tracing::debug!("Error flushing WebSocket Ping"); + debug!("Error flushing WebSocket Ping"); KeepAliveError::Sink(e) })?; - tracing::debug!("WebSocket Ping sent"); + debug!("WebSocket Ping sent"); let now = Instant::now(); let timeout = now + self.max_pong_timeout; @@ -260,7 +268,7 @@ mod keep_alive { match self.stream.poll_next_unpin(cx) { Poll::Ready(Some(Ok(Message::Pong(pong)))) => { - tracing::debug!("Received WebSocket confirmation Pong"); + debug!("Received WebSocket confirmation Pong"); self.last_ping = None; self.ping_interval.as_mut().reset(); @@ -314,20 +322,20 @@ mod keep_alive { use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Once; + use super::*; use poem::web::websocket::Message; use tokio::sync::mpsc; use tokio::time::{timeout, Duration}; use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::PollSender; - - use super::*; + use tracing::Level; static TRACING_SETUP: Once = Once::new(); fn setup_tracing() { TRACING_SETUP.call_once(|| { let subscriber = tracing_subscriber::FmtSubscriber::builder() - .with_max_level(tracing::Level::DEBUG) + .with_max_level(Level::DEBUG) .finish(); tracing::subscriber::set_global_default(subscriber) diff --git a/golem-worker-service-base/src/service/worker/connect_stream.rs b/golem-worker-service-base/src/service/worker/connect_stream.rs index 0a43f0c342..0421f7fc6a 100644 --- a/golem-worker-service-base/src/service/worker/connect_stream.rs +++ b/golem-worker-service-base/src/service/worker/connect_stream.rs @@ -21,7 +21,7 @@ use futures::{Stream, StreamExt}; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tonic::{Status, Streaming}; -use tracing::Instrument; +use tracing::{error, Instrument}; use golem_api_grpc::proto::golem::worker::LogEvent; use golem_common::metrics::api::{ @@ -41,42 +41,38 @@ impl ConnectWorkerStream { let mut streaming = streaming; let cancel = CancellationToken::new(); + let cancel_clone = cancel.clone(); - tokio::spawn({ - record_new_grpc_api_active_stream(); - - let cancel = cancel.clone(); + tokio::spawn( + async move { + record_new_grpc_api_active_stream(); - let forward_loop = { - let sender = sender.clone(); - async move { - while let Some(message) = streaming.next().await { - if let Err(error) = sender.send(message).await { - tracing::error!( - error = error.to_string(), - "Failed to forward WorkerStream" - ); + loop { + tokio::select! { + _ = cancel_clone.cancelled() => { break; } + message = streaming.next() => { + if let Some(message) = message { + if let Err(error) = sender.send(message).await { + error!( + error = error.to_string(), + "Failed to forward WorkerStream" + ); + break; + } + } else { + break; + } + } } } - .in_current_span() - }; - async move { - tokio::select! { - _ = cancel.cancelled() => { - tracing::info!("WorkerStream cancelled"); - } - _ = forward_loop => { - tracing::info!("WorkerStream forward loop finished"); - } - } - sender.closed().await; + drop(sender); record_closed_grpc_api_active_stream(); } - .in_current_span() - }); + .in_current_span(), + ); Self { receiver, cancel } } diff --git a/golem-worker-service/src/api/worker.rs b/golem-worker-service/src/api/worker.rs index 6669445a81..bd71a78ee5 100644 --- a/golem-worker-service/src/api/worker.rs +++ b/golem-worker-service/src/api/worker.rs @@ -8,7 +8,6 @@ use golem_service_base::model::*; use golem_worker_service_base::api::WorkerApiBaseError; use poem_openapi::param::{Header, Path, Query}; use poem_openapi::payload::Json; -use poem_openapi::types::ToJSON; use poem_openapi::*; use std::str::FromStr; use tap::TapFallible; @@ -147,11 +146,6 @@ impl WorkerApi { .await .map_err(|e| e.into()) .map(|result| Json(InvokeResult { result })); - - println!( - "RETURNING RESPONSE JSON: {}", - response.clone().unwrap().to_json_string() - ); record.result(response) } diff --git a/golem-worker-service/src/api/worker_connect.rs b/golem-worker-service/src/api/worker_connect.rs index 44f4a1c61b..503c7e4bad 100644 --- a/golem-worker-service/src/api/worker_connect.rs +++ b/golem-worker-service/src/api/worker_connect.rs @@ -46,7 +46,7 @@ pub async fn ws( websocket: WebSocket, Data(service): Data<&ConnectService>, ) -> Response { - get_worker_stream(service, component_id, worker_name) + connect_to_worker(service, component_id, worker_name) .await .map(|(worker_id, worker_stream)| { websocket @@ -72,7 +72,7 @@ pub async fn ws( const PING_INTERVAL: Duration = Duration::from_secs(30); const PING_TIMEOUT: Duration = Duration::from_secs(15); -async fn get_worker_stream( +async fn connect_to_worker( service: &ConnectService, component_id: ComponentId, worker_name: String, diff --git a/test-components/build-components.sh b/test-components/build-components.sh index dc285cfd80..a846b2893c 100755 --- a/test-components/build-components.sh +++ b/test-components/build-components.sh @@ -2,7 +2,7 @@ rust_test_components=("write-stdout" "write-stderr" "read-stdin" "clocks" "shopping-cart" "file-write-read-delete" "file-service" "http-client" "directories" "environment-service" "promise" "interruption" "clock-service" "option-service" "flags-service" "http-client-2" "stdio-cc" "failing-component" "variant-service" "key-value-service" "blob-store-service" "runtime-service" "networking" "shopping-cart-resource" -"update-test-v1" "update-test-v2" "update-test-v3" "update-test-v4" "rust-echo" "golem-rust-tests" "durability-overhead") +"update-test-v1" "update-test-v2" "update-test-v3" "update-test-v4" "rust-echo" "golem-rust-tests" "durability-overhead" "logging") zig_test_components=("zig-3") tinygo_test_components=("tinygo-wasi" "tinygo-wasi-http") grain_test_components=("grain-1") diff --git a/test-components/logging/.vscode/settings.json b/test-components/logging/.vscode/settings.json new file mode 100644 index 0000000000..7530c05d7b --- /dev/null +++ b/test-components/logging/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "rust-analyzer.server.extraEnv": { "CARGO": "cargo-component" } +} \ No newline at end of file diff --git a/test-components/logging/Cargo.lock b/test-components/logging/Cargo.lock new file mode 100644 index 0000000000..6d294f934f --- /dev/null +++ b/test-components/logging/Cargo.lock @@ -0,0 +1,1403 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "addr2line" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + +[[package]] +name = "anyhow" +version = "1.0.86" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" + +[[package]] +name = "autocfg" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" + +[[package]] +name = "backtrace" +version = "0.3.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + +[[package]] +name = "bitflags" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" + +[[package]] +name = "bumpalo" +version = "3.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "bytes" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" + +[[package]] +name = "cc" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9e8aabfac534be767c909e0690571677d49f41bd8465ae876fe043d52ba5292" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + +[[package]] +name = "encoding_rs" +version = "0.8.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + +[[package]] +name = "errno" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + +[[package]] +name = "fastrand" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + +[[package]] +name = "form_urlencoded" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +dependencies = [ + "percent-encoding", +] + +[[package]] +name = "futures-channel" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +dependencies = [ + "futures-core", +] + +[[package]] +name = "futures-core" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-sink" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" + +[[package]] +name = "futures-task" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" + +[[package]] +name = "futures-util" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +dependencies = [ + "futures-core", + "futures-task", + "pin-project-lite", + "pin-utils", +] + +[[package]] +name = "getrandom" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "gimli" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" + +[[package]] +name = "golem-rust" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3eee21493a21f3bbdfdc79ad10fff2e72c20f12a7c24d017ad69c5bcf656127" +dependencies = [ + "golem-rust-macro", + "serde", + "serde_json", + "uuid", + "wit-bindgen-rt 0.26.0", +] + +[[package]] +name = "golem-rust-macro" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcff0f7297bf27c13ba6d840aff62f7081b1826d89cd025a9ff06d652521d522" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "h2" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +dependencies = [ + "unicode-segmentation", +] + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fcc0b4a115bf80b728eb8ea024ad5bd707b615bfed49e0665b6e0f86fd082d9" + +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + +[[package]] +name = "hyper" +version = "0.14.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a152ddd61dfaec7273fe8419ab357f33aee0d914c5f4efbf0d96fa749eea5ec9" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + +[[package]] +name = "id-arena" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25a2bc672d1148e28034f176e01fffebb08b35768468cc954630da77a1449005" + +[[package]] +name = "idna" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + +[[package]] +name = "indexmap" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de3fc2e30ba82dd1b3911c8de1ffc143c74a914a14e99514d7637e3099df5ea0" +dependencies = [ + "equivalent", + "hashbrown", + "serde", +] + +[[package]] +name = "ipnet" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" + +[[package]] +name = "itoa" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" + +[[package]] +name = "js-sys" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" +dependencies = [ + "wasm-bindgen", +] + +[[package]] +name = "leb128" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" + +[[package]] +name = "libc" +version = "0.2.155" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" + +[[package]] +name = "linux-raw-sys" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" + +[[package]] +name = "log" +version = "0.4.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" + +[[package]] +name = "logging" +version = "0.0.1" +dependencies = [ + "golem-rust", + "golem-rust-macro", + "log", + "rand", + "reqwest", + "serde", + "serde_json", + "wasi-logger", + "wit-bindgen-rt 0.26.0", +] + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + +[[package]] +name = "miniz_oxide" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08" +dependencies = [ + "adler", +] + +[[package]] +name = "mio" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +dependencies = [ + "hermit-abi", + "libc", + "wasi", + "windows-sys 0.52.0", +] + +[[package]] +name = "native-tls" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + +[[package]] +name = "object" +version = "0.36.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9" +dependencies = [ + "memchr", +] + +[[package]] +name = "once_cell" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" + +[[package]] +name = "openssl" +version = "0.10.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9529f4786b70a3e8c61e11179af17ab6188ad8d0ded78c5529441ed39d4bd9c1" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + +[[package]] +name = "openssl-sys" +version = "0.9.103" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "percent-encoding" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" + +[[package]] +name = "pin-project-lite" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "pkg-config" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" + +[[package]] +name = "ppv-lite86" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy", +] + +[[package]] +name = "proc-macro2" +version = "1.0.86" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + +[[package]] +name = "reqwest" +version = "0.11.18" +source = "git+https://github.com/zivergetech/reqwest?branch=update-jun-2024#1cf59c67b93aa6292961f8948b93df5bca2753b6" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-tls", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", + "wit-bindgen-rt 0.26.0", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + +[[package]] +name = "rustix" +version = "0.38.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.52.0", +] + +[[package]] +name = "ryu" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" + +[[package]] +name = "schannel" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75da29fe9b9b08fe9d6b22b5b4bcbc75d8db3aa31e639aa56bb62e9d46bfceaf" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "semver" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" + +[[package]] +name = "serde" +version = "1.0.206" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b3e4cd94123dd520a128bcd11e34d9e9e423e7e3e50425cb1b4b1e3549d0284" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.206" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fabfb6138d2383ea8208cf98ccf69cdfb1aff4088460681d84189aa259762f97" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.124" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66ad62847a56b3dba58cc891acd13884b9c61138d330c0d7b6181713d4fce38d" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + +[[package]] +name = "socket2" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + +[[package]] +name = "spdx" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47317bbaf63785b53861e1ae2d11b80d6b624211d42cb20efcd210ee6f8a14bc" +dependencies = [ + "smallvec", +] + +[[package]] +name = "syn" +version = "2.0.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fceb41e3d546d0bd83421d3409b1460cc7444cd389341a4c880fe7a042cb3d7" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tempfile" +version = "3.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04cbcdd0c794ebb0d4cf35e88edd2f7d2c4c3e9a5a6dab322839b321c6a87a64" +dependencies = [ + "cfg-if", + "fastrand", + "once_cell", + "rustix", + "windows-sys 0.59.0", +] + +[[package]] +name = "tinyvec" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "445e881f4f6d382d5f27c034e25eb92edd7c784ceab92a0937db7f2e9471b938" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + +[[package]] +name = "tokio" +version = "1.39.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" +dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "pin-project-lite", + "socket2", + "windows-sys 0.52.0", +] + +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + +[[package]] +name = "tracing" +version = "0.1.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +dependencies = [ + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +dependencies = [ + "once_cell", +] + +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + +[[package]] +name = "unicode-bidi" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" + +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" + +[[package]] +name = "unicode-normalization" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "unicode-segmentation" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" + +[[package]] +name = "unicode-xid" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" + +[[package]] +name = "url" +version = "2.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + +[[package]] +name = "uuid" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +dependencies = [ + "getrandom", +] + +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "wasi-logger" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58aa5201b7f5d96ef2e747a1f60a6dbc38bdd1287ce5e046d1498bd7a793f74b" +dependencies = [ + "log", + "wit-bindgen", +] + +[[package]] +name = "wasm-bindgen" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76bc14366121efc8dbb487ab05bcc9d346b3b5ec0eaa76e46594cabbe51762c0" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" + +[[package]] +name = "wasm-encoder" +version = "0.202.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfd106365a7f5f7aa3c1916a98cbb3ad477f5ff96ddb130285a91c6e7429e67a" +dependencies = [ + "leb128", +] + +[[package]] +name = "wasm-metadata" +version = "0.202.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "094aea3cb90e09f16ee25a4c0e324b3e8c934e7fd838bfa039aef5352f44a917" +dependencies = [ + "anyhow", + "indexmap", + "serde", + "serde_derive", + "serde_json", + "spdx", + "wasm-encoder", + "wasmparser", +] + +[[package]] +name = "wasmparser" +version = "0.202.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6998515d3cf3f8b980ef7c11b29a9b1017d4cf86b99ae93b546992df9931413" +dependencies = [ + "bitflags", + "indexmap", + "semver", +] + +[[package]] +name = "web-sys" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + +[[package]] +name = "wit-bindgen" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fb4e7653763780be47e38f479e9aa83c768aa6a3b2ed086dc2826fdbbb7e7f5" +dependencies = [ + "wit-bindgen-rt 0.24.0", + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b67e11c950041849a10828c7600ea62a4077c01e8af72e8593253575428f91b" +dependencies = [ + "anyhow", + "wit-parser", +] + +[[package]] +name = "wit-bindgen-rt" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0780cf7046630ed70f689a098cd8d56c5c3b22f2a7379bbdb088879963ff96" +dependencies = [ + "bitflags", +] + +[[package]] +name = "wit-bindgen-rt" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c7526379ace8709ee9ab9f2bb50f112d95581063a59ef3097d9c10153886c9" +dependencies = [ + "bitflags", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30acbe8fb708c3a830a33c4cb705df82659bf831b492ec6ca1a17a369cfeeafb" +dependencies = [ + "anyhow", + "heck 0.4.1", + "indexmap", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b1b06eae85feaecdf9f2854f7cac124e00d5a6e5014bfb02eb1ecdeb5f265b9" +dependencies = [ + "anyhow", + "proc-macro2", + "quote", + "syn", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.202.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c836b1fd9932de0431c1758d8be08212071b6bba0151f7bac826dbc4312a2a9" +dependencies = [ + "anyhow", + "bitflags", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.202.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "744237b488352f4f27bca05a10acb79474415951c450e52ebd0da784c1df2bcc" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "byteorder", + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/test-components/logging/Cargo.toml b/test-components/logging/Cargo.toml new file mode 100644 index 0000000000..661bed3f45 --- /dev/null +++ b/test-components/logging/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "logging" +version = "0.0.1" +edition = "2021" + +[lib] +path = "src/lib.rs" +crate-type = ["cdylib"] + +[profile.release] +lto = true +opt-level = 's' + +[dependencies] +wit-bindgen-rt = { version = "0.26.0", features = ["bitflags"] } + +golem-rust = { version = "0.4.0" } +golem-rust-macro = { version = "0.4.0" } +log = { version = "0.4.22", features = ["kv"] } +rand = "0.8.5" +reqwest = { git = "https://github.com/zivergetech/reqwest", branch = "update-jun-2024", features = [ + "json", +] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +wasi-logger = { version = "0.1.2", features = ["kv"] } + +[package.metadata.component.target] +path = "wit" + +[package.metadata.component.target.dependencies] diff --git a/test-components/logging/src/bindings.rs b/test-components/logging/src/bindings.rs new file mode 100644 index 0000000000..d0b85aac21 --- /dev/null +++ b/test-components/logging/src/bindings.rs @@ -0,0 +1,132 @@ +// Generated by `wit-bindgen` 0.25.0. DO NOT EDIT! +// Options used: +#[allow(dead_code)] +pub mod exports { + #[allow(dead_code)] + pub mod golem { + #[allow(dead_code)] + pub mod it { + #[allow(dead_code, clippy::all)] + pub mod api { + #[used] + #[doc(hidden)] + #[cfg(target_arch = "wasm32")] + static __FORCE_SECTION_REF: fn() = + super::super::super::super::__link_custom_section_describing_imports; + use super::super::super::super::_rt; + #[doc(hidden)] + #[allow(non_snake_case)] + pub unsafe fn _export_init_cabi() { + #[cfg(target_arch = "wasm32")] + _rt::run_ctors_once(); + T::init(); + } + #[doc(hidden)] + #[allow(non_snake_case)] + pub unsafe fn _export_forever_random_entries_cabi() { + #[cfg(target_arch = "wasm32")] + _rt::run_ctors_once(); + T::forever_random_entries(); + } + #[doc(hidden)] + #[allow(non_snake_case)] + pub unsafe fn _export_some_random_entries_cabi() { + #[cfg(target_arch = "wasm32")] + _rt::run_ctors_once(); + T::some_random_entries(); + } + #[doc(hidden)] + #[allow(non_snake_case)] + pub unsafe fn _export_forever_random_entries_with_log_cabi() { + #[cfg(target_arch = "wasm32")] + _rt::run_ctors_once(); + T::forever_random_entries_with_log(); + } + pub trait Guest { + fn init(); + fn forever_random_entries(); + fn some_random_entries(); + fn forever_random_entries_with_log(); + } + #[doc(hidden)] + + macro_rules! __export_golem_it_api_cabi{ + ($ty:ident with_types_in $($path_to_types:tt)*) => (const _: () = { + + #[export_name = "golem:it/api#init"] + unsafe extern "C" fn export_init() { + $($path_to_types)*::_export_init_cabi::<$ty>() + } + #[export_name = "golem:it/api#forever-random-entries"] + unsafe extern "C" fn export_forever_random_entries() { + $($path_to_types)*::_export_forever_random_entries_cabi::<$ty>() + } + #[export_name = "golem:it/api#some-random-entries"] + unsafe extern "C" fn export_some_random_entries() { + $($path_to_types)*::_export_some_random_entries_cabi::<$ty>() + } + #[export_name = "golem:it/api#forever-random-entries-with-log"] + unsafe extern "C" fn export_forever_random_entries_with_log() { + $($path_to_types)*::_export_forever_random_entries_with_log_cabi::<$ty>() + } + };); +} + #[doc(hidden)] + pub(crate) use __export_golem_it_api_cabi; + } + } + } +} +mod _rt { + + #[cfg(target_arch = "wasm32")] + pub fn run_ctors_once() { + wit_bindgen_rt::run_ctors_once(); + } +} + +/// Generates `#[no_mangle]` functions to export the specified type as the +/// root implementation of all generated traits. +/// +/// For more information see the documentation of `wit_bindgen::generate!`. +/// +/// ```rust +/// # macro_rules! export{ ($($t:tt)*) => (); } +/// # trait Guest {} +/// struct MyType; +/// +/// impl Guest for MyType { +/// // ... +/// } +/// +/// export!(MyType); +/// ``` +#[allow(unused_macros)] +#[doc(hidden)] + +macro_rules! __export_logging_impl { + ($ty:ident) => (self::export!($ty with_types_in self);); + ($ty:ident with_types_in $($path_to_types_root:tt)*) => ( + $($path_to_types_root)*::exports::golem::it::api::__export_golem_it_api_cabi!($ty with_types_in $($path_to_types_root)*::exports::golem::it::api); + ) +} +#[doc(inline)] +pub(crate) use __export_logging_impl as export; + +#[cfg(target_arch = "wasm32")] +#[link_section = "component-type:wit-bindgen:0.25.0:logging:encoded world"] +#[doc(hidden)] +pub static __WIT_BINDGEN_COMPONENT_TYPE: [u8; 274] = *b"\ +\0asm\x0d\0\x01\0\0\x19\x16wit-component-encoding\x04\0\x07\x94\x01\x01A\x02\x01\ +A\x02\x01B\x05\x01@\0\x01\0\x04\0\x04init\x01\0\x04\0\x16forever-random-entries\x01\ +\0\x04\0\x13some-random-entries\x01\0\x04\0\x1fforever-random-entries-with-log\x01\ +\0\x04\x01\x0cgolem:it/api\x05\0\x04\x01\x10golem:it/logging\x04\0\x0b\x0d\x01\0\ +\x07logging\x03\0\0\0G\x09producers\x01\x0cprocessed-by\x02\x0dwit-component\x07\ +0.208.1\x10wit-bindgen-rust\x060.25.0"; + +#[inline(never)] +#[doc(hidden)] +#[cfg(target_arch = "wasm32")] +pub fn __link_custom_section_describing_imports() { + wit_bindgen_rt::maybe_link_cabi_realloc(); +} diff --git a/test-components/logging/src/lib.rs b/test-components/logging/src/lib.rs new file mode 100644 index 0000000000..5a5c669945 --- /dev/null +++ b/test-components/logging/src/lib.rs @@ -0,0 +1,114 @@ +mod bindings; + +use crate::bindings::exports::golem::it::api::*; +use rand::Rng; +use std::thread::sleep; +use std::time::Duration; + +struct Component; + +impl Guest for Component { + fn init() { + wasi_logger::Logger::install().expect("failed to install wasi_logger::Logger"); + log::set_max_level(log::LevelFilter::Trace); + } + + fn forever_random_entries() { + let mut rng = rand::thread_rng(); + let mut idx = 0; + loop { + let x: u32 = rng.gen(); + Self::print(idx, x); + sleep(Duration::from_millis(500)); + idx += 1; + } + } + + fn forever_random_entries_with_log() { + let mut rng = rand::thread_rng(); + let mut idx = 0; + loop { + let x: u32 = rng.gen(); + Self::log(idx, x); + sleep(Duration::from_millis(500)); + idx += 1; + } + } + + fn some_random_entries() { + let mut rng = rand::thread_rng(); + let mut idx = 0; + while idx < 100 { + let x: u32 = rng.gen(); + Self::print(idx, x); + if idx % 10 == 0 { + sleep(Duration::from_millis(100)); + } + idx += 1; + } + } +} + +impl Component { + fn print(idx: i32, x: u32) { + match idx % 8 { + 0 => { + println!("{idx}. message: {x}"); + } + 1 => { + eprintln!("{idx}. message: {x}"); + } + 2 => golem_rust::bindings::wasi::logging::logging::log( + golem_rust::bindings::wasi::logging::logging::Level::Critical, + &format!("{idx}"), + &format!("{x}"), + ), + 3 => golem_rust::bindings::wasi::logging::logging::log( + golem_rust::bindings::wasi::logging::logging::Level::Error, + &format!("{idx}"), + &format!("{x}"), + ), + 4 => golem_rust::bindings::wasi::logging::logging::log( + golem_rust::bindings::wasi::logging::logging::Level::Debug, + &format!("{idx}"), + &format!("{x}"), + ), + 5 => golem_rust::bindings::wasi::logging::logging::log( + golem_rust::bindings::wasi::logging::logging::Level::Info, + &format!("{idx}"), + &format!("{x}"), + ), + 6 => golem_rust::bindings::wasi::logging::logging::log( + golem_rust::bindings::wasi::logging::logging::Level::Warn, + &format!("{idx}"), + &format!("{x}"), + ), + 7 => golem_rust::bindings::wasi::logging::logging::log( + golem_rust::bindings::wasi::logging::logging::Level::Trace, + &format!("{idx}"), + &format!("{x}"), + ), + _ => {} + } + } + + fn log(idx: i32, x: u32) { + match idx % 8 { + 0 => { + println!("{idx}. message: {x}"); + } + 1 => { + eprintln!("{idx}. message: {x}"); + } + 2 => log::error!(idx; "{x}"), + 3 => log::error!(idx; "{x}"), + 4 => log::debug!(idx; "{x}"), + 5 => log::info!(idx; "{x}"), + 6 => log::warn!(idx; "{x}"), + 7 => log::trace!(idx; "{x}"), + _ => {} + } + } +} + +bindings::export!(Component with_types_in bindings); diff --git a/test-components/logging/wit/logging.wit b/test-components/logging/wit/logging.wit new file mode 100644 index 0000000000..09c2dce1f5 --- /dev/null +++ b/test-components/logging/wit/logging.wit @@ -0,0 +1,12 @@ +package golem:it; + +interface api { + init: func(); + forever-random-entries: func(); + some-random-entries: func(); + forever-random-entries-with-log: func(); +} + +world logging { + export api; +}