Skip to content

Commit

Permalink
Blob-store based oplog archive (#538)
Browse files Browse the repository at this point in the history
* OwnedWorkerId refactoring and initial blob-store oplog archive

* Fixes

* Tests and fixes

* Fix unit test

* Fix
  • Loading branch information
vigoo authored May 31, 2024
1 parent 94d785a commit 6c713a5
Show file tree
Hide file tree
Showing 48 changed files with 1,981 additions and 860 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ jobs:
SCCACHE_GHA_ENABLED: "true"
RUSTC_WRAPPER: "sccache"
run: cargo make worker-executor-tests-${{ matrix.group.name }}
timeout-minutes: 20
timeout-minutes: 25
integration-tests:
runs-on: ubuntu-latest-large
steps:
Expand Down
19 changes: 17 additions & 2 deletions golem-api-grpc/proto/golem/workerexecutor/worker_executor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ service WorkerExecutor {
rpc InvokeAndAwaitWorker(InvokeAndAwaitWorkerRequest) returns (InvokeAndAwaitWorkerResponse);
rpc InvokeWorker(InvokeWorkerRequest) returns (InvokeWorkerResponse);
rpc ConnectWorker(ConnectWorkerRequest) returns (stream golem.worker.LogEvent);
rpc DeleteWorker(golem.worker.WorkerId) returns (DeleteWorkerResponse);
rpc DeleteWorker(DeleteWorkerRequest) returns (DeleteWorkerResponse);
rpc CompletePromise(CompletePromiseRequest) returns (CompletePromiseResponse);
rpc InterruptWorker(InterruptWorkerRequest) returns (InterruptWorkerResponse);
rpc RevokeShards(RevokeShardsRequest) returns (RevokeShardsResponse);
rpc AssignShards(AssignShardsRequest) returns (AssignShardsResponse);
rpc GetWorkerMetadata(golem.worker.WorkerId) returns (GetWorkerMetadataResponse);
rpc GetWorkerMetadata(GetWorkerMetadataRequest) returns (GetWorkerMetadataResponse);
rpc ResumeWorker(ResumeWorkerRequest) returns (ResumeWorkerResponse);
rpc GetRunningWorkersMetadata(GetRunningWorkersMetadataRequest) returns (GetRunningWorkersMetadataResponse);
rpc GetWorkersMetadata(GetWorkersMetadataRequest) returns (GetWorkersMetadataResponse);
Expand All @@ -44,6 +44,11 @@ message InvokeWorkerResponse {
}
}

message DeleteWorkerRequest {
golem.worker.WorkerId worker_id = 1;
golem.common.AccountId account_id = 2;
}

message DeleteWorkerResponse {
oneof result {
golem.common.Empty success = 1;
Expand All @@ -54,6 +59,7 @@ message DeleteWorkerResponse {
message CompletePromiseRequest {
golem.worker.PromiseId promise_id = 1;
bytes data = 2;
golem.common.AccountId account_id = 3;
}

message CompletePromiseResponse {
Expand Down Expand Up @@ -129,6 +135,7 @@ message ConnectWorkerRequest {
message InterruptWorkerRequest {
golem.worker.WorkerId worker_id = 1;
bool recover_immediately = 2;
golem.common.AccountId account_id = 3;
}

message RevokeShardsRequest {
Expand All @@ -153,6 +160,11 @@ message AssignShardsResponse {
}
}

message GetWorkerMetadataRequest {
golem.worker.WorkerId worker_id = 1;
golem.common.AccountId account_id = 2;
}

message GetWorkerMetadataResponse {
oneof result {
golem.worker.WorkerMetadata success = 1;
Expand All @@ -162,6 +174,7 @@ message GetWorkerMetadataResponse {

message ResumeWorkerRequest {
golem.worker.WorkerId worker_id = 1;
golem.common.AccountId account_id = 2;
}

message ResumeWorkerResponse {
Expand Down Expand Up @@ -194,6 +207,7 @@ message GetWorkersMetadataRequest {
golem.worker.Cursor cursor = 3;
uint64 count = 4;
bool precise = 5;
golem.common.AccountId account_id = 6;
}

message GetWorkersMetadataResponse {
Expand All @@ -212,6 +226,7 @@ message UpdateWorkerRequest {
golem.worker.WorkerId worker_id = 1;
uint64 target_version = 2;
golem.worker.UpdateMode mode = 3;
golem.common.AccountId account_id = 4;
}

message UpdateWorkerResponse {
Expand Down
70 changes: 60 additions & 10 deletions golem-common/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,46 @@ impl TryFrom<golem_api_grpc::proto::golem::worker::WorkerId> for WorkerId {
}
}

/// Associates a worker-id with its owner account
#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
pub struct OwnedWorkerId {
pub account_id: AccountId,
pub worker_id: WorkerId,
}

impl OwnedWorkerId {
pub fn new(account_id: &AccountId, worker_id: &WorkerId) -> Self {
Self {
account_id: account_id.clone(),
worker_id: worker_id.clone(),
}
}

pub fn worker_id(&self) -> WorkerId {
self.worker_id.clone()
}

pub fn account_id(&self) -> AccountId {
self.account_id.clone()
}

pub fn component_id(&self) -> ComponentId {
self.worker_id.component_id.clone()
}

pub fn worker_name(&self) -> String {
self.worker_id.worker_name.clone()
}
}

impl Display for OwnedWorkerId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.account_id, self.worker_id)
}
}

#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
pub struct PromiseId {
#[serde(rename = "instance_id")]
pub worker_id: WorkerId,
pub oplog_idx: OplogIndex,
}
Expand Down Expand Up @@ -348,35 +385,44 @@ impl Display for PromiseId {
#[derive(Debug, Clone, Hash, Serialize, Deserialize, Encode, Decode)]
pub enum ScheduledAction {
/// Completes a given promise
CompletePromise { promise_id: PromiseId },
CompletePromise {
account_id: AccountId,
promise_id: PromiseId,
},
/// Archives all entries from the first non-empty layer of an oplog to the next layer,
/// if the last oplog index did not change. If there are more layers below, schedules
/// a next action to archive the next layer.
ArchiveOplog {
account_id: AccountId,
worker_id: WorkerId,
owned_worker_id: OwnedWorkerId,
last_oplog_index: OplogIndex,
next_after: Duration,
},
}

impl ScheduledAction {
pub fn worker_id(&self) -> &WorkerId {
pub fn owned_worker_id(&self) -> OwnedWorkerId {
match self {
ScheduledAction::CompletePromise { promise_id } => &promise_id.worker_id,
ScheduledAction::ArchiveOplog { worker_id, .. } => worker_id,
ScheduledAction::CompletePromise {
account_id,
promise_id,
} => OwnedWorkerId::new(account_id, &promise_id.worker_id),
ScheduledAction::ArchiveOplog {
owned_worker_id, ..
} => owned_worker_id.clone(),
}
}
}

impl Display for ScheduledAction {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ScheduledAction::CompletePromise { promise_id } => {
ScheduledAction::CompletePromise { promise_id, .. } => {
write!(f, "complete[{}]", promise_id)
}
ScheduledAction::ArchiveOplog { worker_id, .. } => {
write!(f, "archive[{}]", worker_id)
ScheduledAction::ArchiveOplog {
owned_worker_id, ..
} => {
write!(f, "archive[{}]", owned_worker_id)
}
}
}
Expand Down Expand Up @@ -668,6 +714,10 @@ impl WorkerMetadata {
last_known_status: WorkerStatusRecord::default(),
}
}

pub fn owned_worker_id(&self) -> OwnedWorkerId {
OwnedWorkerId::new(&self.account_id, &self.worker_id)
}
}

/// Contains status information about a worker according to a given oplog index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ object_prefix = ""
compilation_cache_bucket = "compilation-cache"
custom_data_bucket = "custom-data"
oplog_payload_bucket = "oplog-payload"
compressed_oplog_buckets = ["oplog-archive-1"]

[blob_storage.config.retries]
max_attempts = 3
Expand Down
38 changes: 36 additions & 2 deletions golem-test-framework/src/components/worker_service/forwarding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,15 @@ impl WorkerService for ForwardingWorkerService {
.worker_executor
.client()
.await
.delete_worker(request.worker_id.expect("Worker ID is required"))
.delete_worker(workerexecutor::DeleteWorkerRequest {
worker_id: request.worker_id,
account_id: Some(
AccountId {
value: "test-account".to_string(),
}
.into(),
),
})
.await
.expect("Failed to call golem-worker-executor")
.into_inner();
Expand Down Expand Up @@ -156,7 +164,15 @@ impl WorkerService for ForwardingWorkerService {
.worker_executor
.client()
.await
.get_worker_metadata(request.worker_id.expect("Worker ID is required"))
.get_worker_metadata(workerexecutor::GetWorkerMetadataRequest {
worker_id: Some(request.worker_id.expect("Worker ID is required")),
account_id: Some(
AccountId {
value: "test-account".to_string(),
}
.into(),
),
})
.await
.expect("Failed to call golem-worker-executor")
.into_inner();
Expand Down Expand Up @@ -308,6 +324,12 @@ impl WorkerService for ForwardingWorkerService {
.await
.resume_worker(workerexecutor::ResumeWorkerRequest {
worker_id: request.worker_id,
account_id: Some(
AccountId {
value: "test-account".to_string(),
}
.into(),
),
})
.await
.expect("Failed to call golem-worker-executor")
Expand Down Expand Up @@ -340,6 +362,12 @@ impl WorkerService for ForwardingWorkerService {
.interrupt_worker(workerexecutor::InterruptWorkerRequest {
worker_id: request.worker_id,
recover_immediately: request.recover_immediately,
account_id: Some(
AccountId {
value: "test-account".to_string(),
}
.into(),
),
})
.await
.expect("Failed to call golem-worker-executor")
Expand Down Expand Up @@ -375,6 +403,12 @@ impl WorkerService for ForwardingWorkerService {
worker_id: request.worker_id,
target_version: request.target_version,
mode: request.mode,
account_id: Some(
AccountId {
value: "test-account".to_string(),
}
.into(),
),
})
.await
.expect("Failed to call golem-worker-executor")
Expand Down
1 change: 1 addition & 0 deletions golem-worker-executor-base/config/worker-service.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ object_prefix = ""
compilation_cache_bucket = "compilation-cache"
custom_data_bucket = "custom-data"
oplog_payload_bucket = "oplog-payload"
compressed_oplog_buckets = ["oplog-archive-1"]

[blob_storage.config.retries]
max_attempts = 3
Expand Down
24 changes: 16 additions & 8 deletions golem-worker-executor-base/src/durable_host/blobstore/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
end: u64,
) -> anyhow::Result<Result<Resource<IncomingValue>, Error>> {
record_host_function_call("blobstore::container::container", "get_data");
let account_id = self.state.account_id.clone();
let account_id = self.state.owned_worker_id.account_id();

let container_name = self
.as_wasi_view()
.table()
Expand Down Expand Up @@ -108,7 +109,8 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
data: Resource<OutgoingValue>,
) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("blobstore::container::container", "write_data");
let account_id = self.state.account_id.clone();
let account_id = self.state.owned_worker_id.account_id();

let container_name = self
.as_wasi_view()
.table()
Expand Down Expand Up @@ -144,7 +146,8 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
container: Resource<Container>,
) -> anyhow::Result<Result<Resource<StreamObjectNames>, Error>> {
record_host_function_call("blobstore::container::container", "list_objects");
let account_id = self.state.account_id.clone();
let account_id = self.state.owned_worker_id.account_id();

let container_name = self
.as_wasi_view()
.table()
Expand Down Expand Up @@ -179,7 +182,8 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
name: ObjectName,
) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("blobstore::container::container", "delete_object");
let account_id = self.state.account_id.clone();
let account_id = self.state.owned_worker_id.account_id();

let container_name = self
.as_wasi_view()
.table()
Expand Down Expand Up @@ -210,7 +214,8 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
names: Vec<ObjectName>,
) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("blobstore::container::container", "delete_objects");
let account_id = self.state.account_id.clone();
let account_id = self.state.owned_worker_id.account_id();

let container_name = self
.as_wasi_view()
.table()
Expand Down Expand Up @@ -241,7 +246,8 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
name: ObjectName,
) -> anyhow::Result<Result<bool, Error>> {
record_host_function_call("blobstore::container::container", "has_object");
let account_id = self.state.account_id.clone();
let account_id = self.state.owned_worker_id.account_id();

let container_name = self
.as_wasi_view()
.table()
Expand Down Expand Up @@ -272,7 +278,8 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
name: ObjectName,
) -> anyhow::Result<Result<ObjectMetadata, Error>> {
record_host_function_call("blobstore::container::container", "object_info");
let account_id = self.state.account_id.clone();
let account_id = self.state.owned_worker_id.account_id();

let container_name = self
.as_wasi_view()
.table()
Expand Down Expand Up @@ -311,7 +318,8 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {

async fn clear(&mut self, container: Resource<Container>) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("blobstore::container::container", "clear");
let account_id = self.state.account_id.clone();
let account_id = self.state.owned_worker_id.account_id();

let container_name = self
.as_wasi_view()
.table()
Expand Down
Loading

0 comments on commit 6c713a5

Please sign in to comment.