From 6437fbd83f1b718a653f51eb86738fe0cf300e9b Mon Sep 17 00:00:00 2001 From: Francesco Guardiani Date: Fri, 6 Dec 2024 10:30:01 +0100 Subject: [PATCH] Optimize ReadOnlyInvocationStatusTable#all_invoked_or_killed_invocations, by avoiding loading the full InvocationStatus in memory. (#2349) --- .../src/invocation_status_table/mod.rs | 15 +++-- .../proto/dev/restate/storage/v1/domain.proto | 6 ++ .../src/invocation_status_table/mod.rs | 19 ++++++ crates/storage-api/src/storage.rs | 59 +++++++++++++++++-- 4 files changed, 87 insertions(+), 12 deletions(-) diff --git a/crates/partition-store/src/invocation_status_table/mod.rs b/crates/partition-store/src/invocation_status_table/mod.rs index be134fbbf..bfd2b38bc 100644 --- a/crates/partition-store/src/invocation_status_table/mod.rs +++ b/crates/partition-store/src/invocation_status_table/mod.rs @@ -17,8 +17,8 @@ use futures::Stream; use futures_util::stream; use restate_rocksdb::RocksDbPerfGuard; use restate_storage_api::invocation_status_table::{ - InvocationStatus, InvocationStatusTable, InvocationStatusV1, - InvokedOrKilledInvocationStatusLite, ReadOnlyInvocationStatusTable, + InvocationLite, InvocationStatus, InvocationStatusDiscriminants, InvocationStatusTable, + InvocationStatusV1, InvokedOrKilledInvocationStatusLite, ReadOnlyInvocationStatusTable, }; use restate_storage_api::{Result, StorageError}; use restate_types::identifiers::{InvocationId, InvocationUuid, PartitionKey, WithPartitionKey}; @@ -258,20 +258,19 @@ fn read_invoked_or_killed_status_lite( mut k: &mut &[u8], v: &mut &[u8], ) -> Result> { - // TODO this can be improved by simply parsing InvocationTarget and the Status enum let invocation_id = invocation_id_from_key_bytes(&mut k)?; - let invocation_status = StorageCodec::decode::(v) + let invocation_status = StorageCodec::decode::(v) .map_err(|err| StorageError::Generic(err.into()))?; - if let InvocationStatus::Invoked(invocation_meta) = invocation_status { + if let InvocationStatusDiscriminants::Invoked = invocation_status.status { Ok(Some(InvokedOrKilledInvocationStatusLite { invocation_id, - invocation_target: invocation_meta.invocation_target, + invocation_target: invocation_status.invocation_target, is_invoked: true, })) - } else if let InvocationStatus::Killed(invocation_meta) = invocation_status { + } else if let InvocationStatusDiscriminants::Killed = invocation_status.status { Ok(Some(InvokedOrKilledInvocationStatusLite { invocation_id, - invocation_target: invocation_meta.invocation_target, + invocation_target: invocation_status.invocation_target, is_invoked: false, })) } else { diff --git a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto index 3083aaf25..389b5cb6d 100644 --- a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto +++ b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto @@ -161,6 +161,12 @@ message InvocationStatusV2 { ResponseResult result = 18; } +// Slimmer version of InvocationStatusV2 +message InvocationV2Lite { + InvocationStatusV2.Status status = 1; + InvocationTarget invocation_target = 2; +} + // TODO remove this after 1.1 message InvocationStatus { diff --git a/crates/storage-api/src/invocation_status_table/mod.rs b/crates/storage-api/src/invocation_status_table/mod.rs index 677112f3d..70b434219 100644 --- a/crates/storage-api/src/invocation_status_table/mod.rs +++ b/crates/storage-api/src/invocation_status_table/mod.rs @@ -351,6 +351,25 @@ impl InvocationStatus { protobuf_storage_encode_decode!(InvocationStatus, crate::storage::v1::InvocationStatusV2); +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum InvocationStatusDiscriminants { + Scheduled, + Inboxed, + Invoked, + Suspended, + Killed, + Completed, +} + +/// Lite status of an invocation. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct InvocationLite { + pub status: InvocationStatusDiscriminants, + pub invocation_target: InvocationTarget, +} + +protobuf_storage_encode_decode!(InvocationLite, crate::storage::v1::InvocationV2Lite); + /// Wrapper used by the table implementation only for the migration, don't use it! #[derive(Debug, Default, Clone, PartialEq)] pub struct InvocationStatusV1(pub InvocationStatus); diff --git a/crates/storage-api/src/storage.rs b/crates/storage-api/src/storage.rs index 7d609df1a..a198d05a3 100644 --- a/crates/storage-api/src/storage.rs +++ b/crates/storage-api/src/storage.rs @@ -103,10 +103,11 @@ pub mod v1 { virtual_object_status, BackgroundCallResolutionResult, DedupSequenceNumber, Duration, EnrichedEntryHeader, EntryResult, EpochSequenceNumber, Header, IdempotencyId, IdempotencyMetadata, InboxEntry, InvocationId, InvocationResolutionResult, - InvocationStatus, InvocationStatusV2, InvocationTarget, JournalEntry, JournalEntryId, - JournalMeta, KvPair, OutboxMessage, Promise, ResponseResult, SequenceNumber, ServiceId, - ServiceInvocation, ServiceInvocationResponseSink, Source, SpanContext, SpanRelation, - StateMutation, SubmitNotificationSink, Timer, VirtualObjectStatus, + InvocationStatus, InvocationStatusV2, InvocationTarget, InvocationV2Lite, JournalEntry, + JournalEntryId, JournalMeta, KvPair, OutboxMessage, Promise, ResponseResult, + SequenceNumber, ServiceId, ServiceInvocation, ServiceInvocationResponseSink, Source, + SpanContext, SpanRelation, StateMutation, SubmitNotificationSink, Timer, + VirtualObjectStatus, }; use crate::StorageError; use restate_types::errors::{IdDecodeError, InvocationError}; @@ -867,6 +868,56 @@ pub mod v1 { } } + impl TryFrom for crate::invocation_status_table::InvocationLite { + type Error = ConversionError; + + fn try_from(value: InvocationV2Lite) -> Result { + let InvocationV2Lite { + status, + invocation_target, + } = value; + + let invocation_target = expect_or_fail!(invocation_target)?.try_into()?; + let status = match status.try_into().unwrap_or_default() { + invocation_status_v2::Status::Scheduled => { + crate::invocation_status_table::InvocationStatusDiscriminants::Scheduled + } + invocation_status_v2::Status::Inboxed => { + crate::invocation_status_table::InvocationStatusDiscriminants::Inboxed + } + invocation_status_v2::Status::Invoked => { + crate::invocation_status_table::InvocationStatusDiscriminants::Invoked + } + invocation_status_v2::Status::Suspended => { + crate::invocation_status_table::InvocationStatusDiscriminants::Suspended + } + invocation_status_v2::Status::Killed => { + crate::invocation_status_table::InvocationStatusDiscriminants::Killed + } + invocation_status_v2::Status::Completed => { + crate::invocation_status_table::InvocationStatusDiscriminants::Completed + } + _ => { + return Err(ConversionError::unexpected_enum_variant( + "status", + value.status, + )) + } + }; + + Ok((crate::invocation_status_table::InvocationLite { + status, + invocation_target, + })) + } + } + + impl From for InvocationV2Lite { + fn from(_: crate::invocation_status_table::InvocationLite) -> Self { + panic!("Unexpected usage of InvocationLite, this data structure can be used only for reading, and never for writing") + } + } + impl TryFrom for crate::invocation_status_table::InvocationStatusV1 { type Error = ConversionError;