Skip to content

Commit

Permalink
Worker connect improvements (#745)
Browse files Browse the repository at this point in the history
* Started implementing nicer connect output

* Fixes for worker connect streams on all levels

* Timestamp and prefix options

* Cargo.lock

* Timestamp support

* Support json and yaml mode

* Support --connect in invoke and invoke-and-await

* Removed debug prints

* Format
  • Loading branch information
vigoo authored Aug 13, 2024
1 parent 30c8981 commit 56da5c9
Show file tree
Hide file tree
Showing 37 changed files with 2,704 additions and 525 deletions.
17 changes: 12 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ exclude = [
"test-components/http-client-2",
"test-components/interruption",
"test-components/key-value-service",
"test-components/logging",
"test-components/networking",
"test-components/option-service",
"test-components/promise",
Expand Down
24 changes: 0 additions & 24 deletions golem-api-grpc/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.file_descriptor_set_path(out_dir.join("services.bin"))
.extern_path(".wasm.rpc", "::golem_wasm_rpc::protobuf")
.extern_path(".wasm.ast", "::golem_wasm_ast::analysis::protobuf")
.type_attribute(
"golem.worker.LogEvent",
"#[derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"golem.worker.LogEvent.event",
"#[derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"golem.worker.StdOutLog",
"#[derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"golem.worker.StdErrLog",
"#[derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"golem.worker.Level",
"#[derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"golem.worker.Log",
"#[derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize)]",
)
.include_file("mod.rs")
.compile(
&[
Expand Down
5 changes: 5 additions & 0 deletions golem-api-grpc/proto/golem/worker/log_event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ syntax = "proto3";

package golem.worker;

import "google/protobuf/timestamp.proto";

message LogEvent {
oneof event {
StdOutLog stdout = 1;
Expand All @@ -12,10 +14,12 @@ message LogEvent {

message StdOutLog {
string message = 1;
google.protobuf.Timestamp timestamp = 2;
}

message StdErrLog {
string message = 1;
google.protobuf.Timestamp timestamp = 2;
}

enum Level {
Expand All @@ -31,4 +35,5 @@ message Log {
Level level = 1;
string context = 2;
string message = 3;
google.protobuf.Timestamp timestamp = 4;
}
1 change: 1 addition & 0 deletions golem-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ http = { workspace = true }
hyper = { workspace = true }
indoc = "2.0.4"
itertools = { workspace = true }
lenient_bool = "0.1.1"
log = { version = "0.4.22" }
native-tls = "0.2.11"
openapiv3 = { workspace = true }
Expand Down
22 changes: 20 additions & 2 deletions golem-cli/src/clients/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::command::worker::WorkerConnectOptions;
use crate::model::{
GolemError, IdempotencyKey, WorkerMetadata, WorkerName, WorkerUpdateMode,
Format, GolemError, IdempotencyKey, WorkerMetadata, WorkerName, WorkerUpdateMode,
WorkersMetadataResponse,
};
use async_trait::async_trait;
Expand Down Expand Up @@ -67,7 +68,24 @@ pub trait WorkerClient {
count: Option<u64>,
precise: Option<bool>,
) -> Result<WorkersMetadataResponse, GolemError>;
async fn connect(&self, worker_urn: WorkerUrn) -> Result<(), GolemError>;
async fn connect(
&self,
worker_urn: WorkerUrn,
connect_options: WorkerConnectOptions,
format: Format,
) -> Result<(), GolemError>;

async fn connect_forever(
&self,
worker_urn: WorkerUrn,
connect_options: WorkerConnectOptions,
format: Format,
) -> Result<(), GolemError> {
loop {
self.connect(worker_urn.clone(), connect_options.clone(), format)
.await?;
}
}

async fn update(
&self,
Expand Down
156 changes: 132 additions & 24 deletions golem-cli/src/command/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,23 @@

use crate::command::ComponentRefSplit;
use clap::builder::ValueParser;
use clap::{ArgMatches, Error, FromArgMatches, Subcommand};
use clap::{ArgMatches, Args, Error, FromArgMatches, Subcommand};
use golem_client::model::ScanCursor;
use golem_common::model::WorkerId;
use golem_common::uri::oss::uri::{ComponentUri, WorkerUri};
use golem_common::uri::oss::url::{ComponentUrl, WorkerUrl};
use golem_common::uri::oss::urn::{ComponentUrn, WorkerUrn};
use std::sync::Arc;
use tokio::join;
use tokio::task::spawn;

use crate::model::{
Format, GolemError, GolemResult, IdempotencyKey, JsonValueParser, WorkerName, WorkerUpdateMode,
};
use crate::oss::model::OssContext;
use crate::parse_key_val;
use crate::service::project::ProjectResolver;
use crate::service::worker::WorkerService;
use crate::{parse_bool, parse_key_val};

#[derive(clap::Args, Debug, Clone)]
pub struct OssWorkerNameOrUriArg {
Expand Down Expand Up @@ -211,6 +214,42 @@ impl From<&OssWorkerUriArg> for OssWorkerNameOrUriArg {
}
}

#[derive(Args, Debug, Clone)]
pub struct WorkerConnectOptions {
/// Use colored log lines in text mode
#[arg(long,
action=clap::ArgAction::Set,
value_parser = parse_bool,
default_missing_value="true",
default_value_t=true,
num_args = 0..=1,
require_equals = false,
)]
pub colors: bool,

/// Show a timestamp for each log line
#[arg(long,
action=clap::ArgAction::Set,
value_parser = parse_bool,
default_missing_value="true",
default_value_t=true,
num_args = 0..=1,
require_equals = false,
)]
pub show_timestamp: bool,

/// Show the source or log level for each log line
#[arg(long,
action=clap::ArgAction::Set,
value_parser = parse_bool,
default_missing_value="true",
default_value_t=false,
num_args = 0..=1,
require_equals = false,
)]
pub show_level: bool,
}

#[derive(Subcommand, Debug)]
#[command()]
pub enum WorkerSubcommand<ComponentRef: clap::Args, WorkerRef: clap::Args> {
Expand Down Expand Up @@ -254,6 +293,13 @@ pub enum WorkerSubcommand<ComponentRef: clap::Args, WorkerRef: clap::Args> {

#[command(flatten)]
parameters: InvokeParameterList,

/// Connect to the worker during the invocation and show its logs
#[arg(long)]
connect: bool,

#[command(flatten)]
connect_options: WorkerConnectOptions,
},

/// Triggers a function invocation on a worker without waiting for its completion
Expand All @@ -272,13 +318,22 @@ pub enum WorkerSubcommand<ComponentRef: clap::Args, WorkerRef: clap::Args> {

#[command(flatten)]
parameters: InvokeParameterList,

/// Connect to the worker and show its logs
#[arg(long)]
connect: bool,

#[command(flatten)]
connect_options: WorkerConnectOptions,
},

/// Connect to a worker and live stream its standard output, error and log channels
#[command()]
Connect {
#[command(flatten)]
worker_ref: WorkerRef,
#[command(flatten)]
connect_options: WorkerConnectOptions,
},

/// Interrupts a running worker
Expand Down Expand Up @@ -373,11 +428,14 @@ impl WorkerRefSplit<OssContext> for OssWorkerUriArg {
}

impl<ComponentRef: clap::Args, WorkerRef: clap::Args> WorkerSubcommand<ComponentRef, WorkerRef> {
pub async fn handle<ProjectRef: Send + Sync + 'static, ProjectContext: Send + Sync>(
pub async fn handle<
ProjectRef: Send + Sync + 'static,
ProjectContext: Clone + Send + Sync + 'static,
>(
self,
format: Format,
service: &(dyn WorkerService<ProjectContext = ProjectContext> + Send + Sync),
projects: &(dyn ProjectResolver<ProjectRef, ProjectContext> + Send + Sync),
service: Arc<dyn WorkerService<ProjectContext = ProjectContext> + Send + Sync>,
projects: Arc<dyn ProjectResolver<ProjectRef, ProjectContext> + Send + Sync>,
) -> Result<GolemResult, GolemError>
where
ComponentRef: ComponentRefSplit<ProjectRef>,
Expand All @@ -402,44 +460,94 @@ impl<ComponentRef: clap::Args, WorkerRef: clap::Args> WorkerSubcommand<Component
idempotency_key,
function,
parameters,
connect,
connect_options,
} => {
let (worker_uri, project_ref) = worker_ref.split();
let project_id = projects.resolve_id_or_default_opt(project_ref).await?;
service
.invoke_and_await(
format,
worker_uri,
idempotency_key,
function,
parameters.parameters,
parameters.wave,
project_id,
)
.await
if connect {
let worker_uri_clone = worker_uri.clone();
let project_id_clone = project_id.clone();
let service_clone = service.clone();
let connect_handle = spawn(async move {
let _ = service_clone
.connect(worker_uri_clone, project_id_clone, connect_options, format)
.await;
});
let result = service
.invoke_and_await(
format,
worker_uri,
idempotency_key,
function,
parameters.parameters,
parameters.wave,
project_id,
)
.await;

connect_handle.abort();
result
} else {
service
.invoke_and_await(
format,
worker_uri,
idempotency_key,
function,
parameters.parameters,
parameters.wave,
project_id,
)
.await
}
}
WorkerSubcommand::Invoke {
worker_ref,
idempotency_key,
function,
parameters,
connect,
connect_options,
} => {
let (worker_uri, project_ref) = worker_ref.split();
let project_id = projects.resolve_id_or_default_opt(project_ref).await?;
service
.invoke(
worker_uri,

if connect {
let invoke_future = service.invoke(
worker_uri.clone(),
idempotency_key,
function,
parameters.parameters,
parameters.wave,
project_id,
)
.await
project_id.clone(),
);
let connect_future =
service.connect(worker_uri, project_id, connect_options, format);

join!(invoke_future, connect_future).0
} else {
service
.invoke(
worker_uri,
idempotency_key,
function,
parameters.parameters,
parameters.wave,
project_id,
)
.await
}
}
WorkerSubcommand::Connect { worker_ref } => {
WorkerSubcommand::Connect {
worker_ref,
connect_options,
} => {
let (worker_uri, project_ref) = worker_ref.split();
let project_id = projects.resolve_id_or_default_opt(project_ref).await?;
service.connect(worker_uri, project_id).await
service
.connect(worker_uri, project_id, connect_options, format)
.await
}
WorkerSubcommand::Interrupt { worker_ref } => {
let (worker_uri, project_ref) = worker_ref.split();
Expand Down
Loading

0 comments on commit 56da5c9

Please sign in to comment.