Skip to content

Commit

Permalink
Optimize ReadOnlyInvocationStatusTable#all_invoked_or_killed_invocati…
Browse files Browse the repository at this point in the history
…ons, by avoiding loading the full InvocationStatus in memory. (#2349)
  • Loading branch information
slinkydeveloper authored Dec 6, 2024
1 parent 8c5ae2e commit 6437fbd
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 12 deletions.
15 changes: 7 additions & 8 deletions crates/partition-store/src/invocation_status_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use futures::Stream;
use futures_util::stream;
use restate_rocksdb::RocksDbPerfGuard;
use restate_storage_api::invocation_status_table::{
InvocationStatus, InvocationStatusTable, InvocationStatusV1,
InvokedOrKilledInvocationStatusLite, ReadOnlyInvocationStatusTable,
InvocationLite, InvocationStatus, InvocationStatusDiscriminants, InvocationStatusTable,
InvocationStatusV1, InvokedOrKilledInvocationStatusLite, ReadOnlyInvocationStatusTable,
};
use restate_storage_api::{Result, StorageError};
use restate_types::identifiers::{InvocationId, InvocationUuid, PartitionKey, WithPartitionKey};
Expand Down Expand Up @@ -258,20 +258,19 @@ fn read_invoked_or_killed_status_lite(
mut k: &mut &[u8],
v: &mut &[u8],
) -> Result<Option<InvokedOrKilledInvocationStatusLite>> {
// TODO this can be improved by simply parsing InvocationTarget and the Status enum
let invocation_id = invocation_id_from_key_bytes(&mut k)?;
let invocation_status = StorageCodec::decode::<InvocationStatus, _>(v)
let invocation_status = StorageCodec::decode::<InvocationLite, _>(v)
.map_err(|err| StorageError::Generic(err.into()))?;
if let InvocationStatus::Invoked(invocation_meta) = invocation_status {
if let InvocationStatusDiscriminants::Invoked = invocation_status.status {
Ok(Some(InvokedOrKilledInvocationStatusLite {
invocation_id,
invocation_target: invocation_meta.invocation_target,
invocation_target: invocation_status.invocation_target,
is_invoked: true,
}))
} else if let InvocationStatus::Killed(invocation_meta) = invocation_status {
} else if let InvocationStatusDiscriminants::Killed = invocation_status.status {
Ok(Some(InvokedOrKilledInvocationStatusLite {
invocation_id,
invocation_target: invocation_meta.invocation_target,
invocation_target: invocation_status.invocation_target,
is_invoked: false,
}))
} else {
Expand Down
6 changes: 6 additions & 0 deletions crates/storage-api/proto/dev/restate/storage/v1/domain.proto
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ message InvocationStatusV2 {
ResponseResult result = 18;
}

// Slimmer version of InvocationStatusV2
message InvocationV2Lite {
InvocationStatusV2.Status status = 1;
InvocationTarget invocation_target = 2;
}

// TODO remove this after 1.1
message InvocationStatus {

Expand Down
19 changes: 19 additions & 0 deletions crates/storage-api/src/invocation_status_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,25 @@ impl InvocationStatus {

protobuf_storage_encode_decode!(InvocationStatus, crate::storage::v1::InvocationStatusV2);

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum InvocationStatusDiscriminants {
Scheduled,
Inboxed,
Invoked,
Suspended,
Killed,
Completed,
}

/// Lite status of an invocation.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct InvocationLite {
pub status: InvocationStatusDiscriminants,
pub invocation_target: InvocationTarget,
}

protobuf_storage_encode_decode!(InvocationLite, crate::storage::v1::InvocationV2Lite);

/// Wrapper used by the table implementation only for the migration, don't use it!
#[derive(Debug, Default, Clone, PartialEq)]
pub struct InvocationStatusV1(pub InvocationStatus);
Expand Down
59 changes: 55 additions & 4 deletions crates/storage-api/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ pub mod v1 {
virtual_object_status, BackgroundCallResolutionResult, DedupSequenceNumber, Duration,
EnrichedEntryHeader, EntryResult, EpochSequenceNumber, Header, IdempotencyId,
IdempotencyMetadata, InboxEntry, InvocationId, InvocationResolutionResult,
InvocationStatus, InvocationStatusV2, InvocationTarget, JournalEntry, JournalEntryId,
JournalMeta, KvPair, OutboxMessage, Promise, ResponseResult, SequenceNumber, ServiceId,
ServiceInvocation, ServiceInvocationResponseSink, Source, SpanContext, SpanRelation,
StateMutation, SubmitNotificationSink, Timer, VirtualObjectStatus,
InvocationStatus, InvocationStatusV2, InvocationTarget, InvocationV2Lite, JournalEntry,
JournalEntryId, JournalMeta, KvPair, OutboxMessage, Promise, ResponseResult,
SequenceNumber, ServiceId, ServiceInvocation, ServiceInvocationResponseSink, Source,
SpanContext, SpanRelation, StateMutation, SubmitNotificationSink, Timer,
VirtualObjectStatus,
};
use crate::StorageError;
use restate_types::errors::{IdDecodeError, InvocationError};
Expand Down Expand Up @@ -867,6 +868,56 @@ pub mod v1 {
}
}

impl TryFrom<InvocationV2Lite> for crate::invocation_status_table::InvocationLite {
type Error = ConversionError;

fn try_from(value: InvocationV2Lite) -> Result<Self, Self::Error> {
let InvocationV2Lite {
status,
invocation_target,
} = value;

let invocation_target = expect_or_fail!(invocation_target)?.try_into()?;
let status = match status.try_into().unwrap_or_default() {
invocation_status_v2::Status::Scheduled => {
crate::invocation_status_table::InvocationStatusDiscriminants::Scheduled
}
invocation_status_v2::Status::Inboxed => {
crate::invocation_status_table::InvocationStatusDiscriminants::Inboxed
}
invocation_status_v2::Status::Invoked => {
crate::invocation_status_table::InvocationStatusDiscriminants::Invoked
}
invocation_status_v2::Status::Suspended => {
crate::invocation_status_table::InvocationStatusDiscriminants::Suspended
}
invocation_status_v2::Status::Killed => {
crate::invocation_status_table::InvocationStatusDiscriminants::Killed
}
invocation_status_v2::Status::Completed => {
crate::invocation_status_table::InvocationStatusDiscriminants::Completed
}
_ => {
return Err(ConversionError::unexpected_enum_variant(
"status",
value.status,
))
}
};

Ok((crate::invocation_status_table::InvocationLite {
status,
invocation_target,
}))
}
}

impl From<crate::invocation_status_table::InvocationLite> for InvocationV2Lite {
fn from(_: crate::invocation_status_table::InvocationLite) -> Self {
panic!("Unexpected usage of InvocationLite, this data structure can be used only for reading, and never for writing")
}
}

impl TryFrom<InvocationStatus> for crate::invocation_status_table::InvocationStatusV1 {
type Error = ConversionError;

Expand Down

0 comments on commit 6437fbd

Please sign in to comment.