Skip to content

Commit

Permalink
golem-host - get worker metadata (#659)
Browse files Browse the repository at this point in the history
* golem-host - get worker metadata

* fixes

* runtime-service test component - get worker metadata

* rebase fixes

* rebase fixes

* helm chart fix
  • Loading branch information
justcoon authored Jul 15, 2024
1 parent e68f3f5 commit 26d937b
Show file tree
Hide file tree
Showing 17 changed files with 1,100 additions and 380 deletions.
356 changes: 178 additions & 178 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion golem-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ clap-verbosity-flag = "2.1.1"
derive_more = { workspace = true }
dirs = "5.0.1"
futures-util = { workspace = true }
golem-examples = "0.3.0"
golem-examples = "0.3.1"
golem-wasm-ast = { workspace = true }
golem-wasm-rpc = { workspace = true }
golem-wasm-rpc-stubgen = { version = "0.0.33", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion golem-worker-executor-base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fs-set-times = "0.20.1"
futures = { workspace = true }
futures-util = { workspace = true }
gethostname = "0.4.3"
golem-wit = { version = "0.3.0" }
golem-wit = { version = "0.3.1" }
http = { workspace = true }
http_02 = { workspace = true }
http-body = "1.0.0" # keep in sync with wasmtime
Expand Down
18 changes: 17 additions & 1 deletion golem-worker-executor-base/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,23 @@ fn find_package_root(name: &str) -> String {
.manifest_path("./Cargo.toml")
.exec()
.unwrap();
let package = metadata.packages.iter().find(|p| p.name == name).unwrap();

let package = metadata
.packages
.into_iter()
.fold(None, |acc, p| {
if p.name == name {
match acc {
None => Some(p),
Some(cp) if cp.version < p.version => Some(p),
_ => acc,
}
} else {
acc
}
})
.unwrap();

package.manifest_path.parent().unwrap().to_string()
}

Expand Down
46 changes: 46 additions & 0 deletions golem-worker-executor-base/src/durable_host/golem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use crate::model::InterruptKind;
use crate::preview2::golem;
use crate::preview2::golem::api::host::{
ComponentVersion, HostGetWorkers, PersistenceLevel, RetryPolicy, UpdateMode, Uri,
WorkerMetadata,
};
use crate::services::HasWorker;
use crate::workerctx::WorkerCtx;
use golem_common::model::oplog::{OplogEntry, OplogIndex, WrappedFunctionType};
use golem_common::model::regions::OplogRegion;
Expand Down Expand Up @@ -462,6 +464,39 @@ impl<Ctx: WorkerCtx> golem::api::host::Host for DurableWorkerCtx<Ctx> {

Ok(())
}

async fn get_self_metadata(&mut self) -> anyhow::Result<golem::api::host::WorkerMetadata> {
record_host_function_call("golem::api", "get_self_metadata");
let metadata = self.public_state.worker().get_metadata().await?;
Ok(metadata.into())
}

async fn get_worker_metadata(
&mut self,
worker_id: golem::api::host::WorkerId,
) -> anyhow::Result<Option<golem::api::host::WorkerMetadata>> {
record_host_function_call("golem::api", "get_worker_metadata");
let worker_id: WorkerId = worker_id.into();
let owned_worker_id = OwnedWorkerId::new(&self.owned_worker_id.account_id, &worker_id);
let metadata = self.state.worker_service.get(&owned_worker_id).await;

match metadata {
Some(metadata) => {
let last_known_status = Ctx::compute_latest_worker_status(
&self.state,
&owned_worker_id,
&Some(metadata.clone()),
)
.await?;
let updated_metadata = golem_common::model::WorkerMetadata {
last_known_status,
..metadata
};
Ok(Some(updated_metadata.into()))
}
None => Ok(None),
}
}
}

#[async_trait]
Expand Down Expand Up @@ -586,6 +621,17 @@ impl<Ctx: WorkerCtx> golem::api::host::Host for &mut DurableWorkerCtx<Ctx> {
) -> anyhow::Result<()> {
(*self).update_worker(worker_id, target_version, mode).await
}

async fn get_self_metadata(&mut self) -> anyhow::Result<WorkerMetadata> {
(*self).get_self_metadata().await
}

async fn get_worker_metadata(
&mut self,
worker_id: golem::api::host::WorkerId,
) -> anyhow::Result<Option<WorkerMetadata>> {
(*self).get_worker_metadata(worker_id).await
}
}

impl From<WorkerId> for golem::api::host::WorkerId {
Expand Down
10 changes: 8 additions & 2 deletions golem-worker-executor-base/src/durable_host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::services::key_value::KeyValueService;
use crate::services::promise::PromiseService;
use crate::services::worker::WorkerService;
use crate::services::worker_event::WorkerEventService;
use crate::services::{worker_enumeration, HasAll, HasOplog, HasWorker};
use crate::services::{golem_config, worker_enumeration, HasAll, HasConfig, HasOplog, HasWorker};
use crate::wasi_host::managed_stdio::ManagedStandardIo;
use crate::workerctx::{
ExternalOperations, IndexedResourceStore, InvocationHooks, InvocationManagement, IoCapturing,
Expand Down Expand Up @@ -1012,7 +1012,7 @@ impl<Ctx: WorkerCtx + DurableWorkerCtxView<Ctx>> ExternalOperations<Ctx> for Dur
last_error_and_retry_count(this, owned_worker_id).await
}

async fn compute_latest_worker_status<T: HasAll<Ctx> + Send + Sync>(
async fn compute_latest_worker_status<T: HasOplogService + HasConfig + Send + Sync>(
this: &T,
owned_worker_id: &OwnedWorkerId,
metadata: &Option<WorkerMetadata>,
Expand Down Expand Up @@ -1735,6 +1735,12 @@ impl HasOplog for PrivateDurableWorkerState {
}
}

impl HasConfig for PrivateDurableWorkerState {
fn config(&self) -> Arc<golem_config::GolemConfig> {
self.config.clone()
}
}

pub struct PublicDurableWorkerState<Ctx: WorkerCtx> {
promise_service: Arc<dyn PromiseService + Send + Sync>,
event_service: Arc<dyn WorkerEventService + Send + Sync>,
Expand Down
2 changes: 1 addition & 1 deletion golem-worker-executor-base/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1684,7 +1684,7 @@ pub async fn calculate_last_known_status<T>(
metadata: &Option<WorkerMetadata>,
) -> Result<WorkerStatusRecord, GolemError>
where
T: HasOplogService + HasWorkerService + HasConfig,
T: HasOplogService + HasConfig,
{
let last_known = metadata
.as_ref()
Expand Down
6 changes: 4 additions & 2 deletions golem-worker-executor-base/src/workerctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ use crate::services::scheduler::SchedulerService;
use crate::services::worker::WorkerService;
use crate::services::worker_event::WorkerEventService;
use crate::services::worker_proxy::WorkerProxy;
use crate::services::{worker_enumeration, HasAll, HasOplog, HasWorker};
use crate::services::{
worker_enumeration, HasAll, HasConfig, HasOplog, HasOplogService, HasWorker,
};
use crate::worker::{RetryDecision, Worker};

/// WorkerCtx is the primary customization and extension point of worker executor. It is the context
Expand Down Expand Up @@ -330,7 +332,7 @@ pub trait ExternalOperations<Ctx: WorkerCtx> {
) -> Option<LastError>;

/// Gets a best-effort current worker status without activating the worker
async fn compute_latest_worker_status<T: HasAll<Ctx> + Send + Sync>(
async fn compute_latest_worker_status<T: HasOplogService + HasConfig + Send + Sync>(
this: &T,
owned_worker_id: &OwnedWorkerId,
metadata: &Option<WorkerMetadata>,
Expand Down
86 changes: 86 additions & 0 deletions golem-worker-executor-base/tests/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,92 @@ async fn get_workers_from_worker() {
drop(executor);
}

#[tokio::test]
#[tracing::instrument]
async fn get_metadata_from_worker() {
let context = TestContext::new();
let mut executor = start(&context).await.unwrap();

let component_id = executor.store_component("runtime-service").await;

let worker_id1 = executor
.start_worker(&component_id, "runtime-service-1")
.await;

let worker_id2 = executor
.start_worker(&component_id, "runtime-service-2")
.await;

fn get_worker_id_val(worker_id: &WorkerId) -> Value {
let component_id_val = {
let (high, low) = worker_id.component_id.0.as_u64_pair();
Value::Record(vec![Value::Record(vec![Value::U64(high), Value::U64(low)])])
};

Value::Record(vec![
component_id_val,
Value::String(worker_id.worker_name.clone()),
])
}

async fn get_check(
worker_id1: &WorkerId,
worker_id2: &WorkerId,
executor: &mut TestWorkerExecutor,
) {
let worker_id_val1 = get_worker_id_val(worker_id1);

let result = executor
.invoke_and_await(worker_id1, "golem:it/api.{get-self-metadata}", vec![])
.await
.unwrap();

match result.first() {
Some(Value::Record(values)) if !values.is_empty() => {
let id_val = values.first().unwrap();
check!(worker_id_val1 == *id_val);
}
_ => {
check!(false);
}
}

let worker_id_val2 = get_worker_id_val(worker_id2);

let result = executor
.invoke_and_await(
worker_id1,
"golem:it/api.{get-worker-metadata}",
vec![worker_id_val2.clone()],
)
.await
.unwrap();

match result.first() {
Some(Value::Option(value)) if value.is_some() => {
let result = *value.clone().unwrap();
match result {
Value::Record(values) if !values.is_empty() => {
let id_val = values.first().unwrap();
check!(worker_id_val2 == *id_val);
}
_ => {
check!(false);
}
}
}
_ => {
check!(false);
}
}
}

get_check(&worker_id1, &worker_id2, &mut executor).await;
get_check(&worker_id2, &worker_id1, &mut executor).await;

drop(executor);
}

#[tokio::test]
#[tracing::instrument]
async fn invoking_with_same_idempotency_key_is_idempotent() {
Expand Down
4 changes: 2 additions & 2 deletions golem-worker-executor-base/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use golem_worker_executor_base::services::shard_manager::ShardManagerService;
use golem_worker_executor_base::services::worker::WorkerService;
use golem_worker_executor_base::services::worker_activator::WorkerActivator;
use golem_worker_executor_base::services::worker_event::WorkerEventService;
use golem_worker_executor_base::services::{All, HasAll};
use golem_worker_executor_base::services::{All, HasAll, HasConfig, HasOplogService};
use golem_worker_executor_base::wasi_host::create_linker;
use golem_worker_executor_base::workerctx::{
ExternalOperations, FuelManagement, IndexedResourceStore, InvocationHooks,
Expand Down Expand Up @@ -429,7 +429,7 @@ impl ExternalOperations<TestWorkerCtx> for TestWorkerCtx {
.await
}

async fn compute_latest_worker_status<T: HasAll<TestWorkerCtx> + Send + Sync>(
async fn compute_latest_worker_status<T: HasOplogService + HasConfig + Send + Sync>(
this: &T,
owned_worker_id: &OwnedWorkerId,
metadata: &Option<WorkerMetadata>,
Expand Down
6 changes: 4 additions & 2 deletions golem-worker-executor/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ use golem_worker_executor_base::services::scheduler::SchedulerService;
use golem_worker_executor_base::services::worker::WorkerService;
use golem_worker_executor_base::services::worker_event::WorkerEventService;
use golem_worker_executor_base::services::worker_proxy::WorkerProxy;
use golem_worker_executor_base::services::{worker_enumeration, HasAll};
use golem_worker_executor_base::services::{
worker_enumeration, HasAll, HasConfig, HasOplogService,
};
use golem_worker_executor_base::worker::{RetryDecision, Worker};
use golem_worker_executor_base::workerctx::{
ExternalOperations, FuelManagement, IndexedResourceStore, InvocationHooks,
Expand Down Expand Up @@ -96,7 +98,7 @@ impl ExternalOperations<Context> for Context {
DurableWorkerCtx::<Context>::get_last_error_and_retry_count(this, worker_id).await
}

async fn compute_latest_worker_status<T: HasAll<Context> + Send + Sync>(
async fn compute_latest_worker_status<T: HasOplogService + HasConfig + Send + Sync>(
this: &T,
worker_id: &OwnedWorkerId,
metadata: &Option<WorkerMetadata>,
Expand Down
11 changes: 11 additions & 0 deletions kube/golem-chart/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,17 @@ spec:
value: {{ .Values.workerService.postgres.schema }}
- name: GOLEM__DB__CONFIG__HOST
value: {{ .Values.workerService.postgres.host }}
{{- if .Values.workerService.postgres.password.plain }}
- name: GOLEM__DB__CONFIG__PASSWORD
value: "{{ .Values.workerService.postgres.password.plain }}"
{{- end }}
{{- if .Values.workerService.postgres.password.fromSecret.name }}
- name: GOLEM__DB__CONFIG__PASSWORD
valueFrom:
secretKeyRef:
name: {{ .Values.workerService.postgres.password.fromSecret.name }}
key: {{ .Values.workerService.postgres.password.fromSecret.key }}
{{- end }}
- name: GOLEM__DB__CONFIG__PORT
value: "{{ .Values.workerService.postgres.port }}"
- name: GOLEM__DB__CONFIG__USERNAME
Expand Down
Binary file modified test-components/runtime-service.wasm
Binary file not shown.
Loading

0 comments on commit 26d937b

Please sign in to comment.