From 561679c245ac0c47ccfd5ec2d8f91ca9498c6b28 Mon Sep 17 00:00:00 2001 From: Francesco Guardiani Date: Fri, 6 Dec 2024 09:56:25 +0100 Subject: [PATCH] Introduce InvocationStatus::Killed (#2335) Add InvocationStatus::Killed as experimental feature. Fix cancellation/killing of scheduled invocations. Reply KILLED when receiving attach/get output and deduplication. Handle killed invocations during leadership change. Add acknowledgment flag to invoker abort command. This makes sure that after a kill, the invoker will send a "terminal" effect to the state machine. --- .github/workflows/ci.yml | 18 + Cargo.lock | 1 + cli/src/clients/datafusion_helpers.rs | 3 + cli/src/ui/invocations.rs | 1 + crates/invoker-api/src/handle.rs | 2 + crates/invoker-api/src/lib.rs | 1 + crates/invoker-impl/src/input_command.rs | 3 + crates/invoker-impl/src/lib.rs | 22 +- .../src/invocation_status_table/mod.rs | 46 +- .../tests/invocation_status_table_test/mod.rs | 44 +- .../proto/dev/restate/storage/v1/domain.proto | 1 + .../src/invocation_status_table/mod.rs | 26 +- crates/storage-api/src/storage.rs | 94 ++++ crates/storage-api/src/timer_table/mod.rs | 2 +- .../src/invocation_status/row.rs | 4 + .../src/invocation_status/schema.rs | 2 +- crates/types/src/config/worker.rs | 8 + crates/worker/Cargo.toml | 1 + crates/worker/src/partition/cleaner.rs | 6 +- .../src/partition/leadership/leader_state.rs | 15 +- crates/worker/src/partition/leadership/mod.rs | 60 ++- crates/worker/src/partition/mod.rs | 27 +- .../src/partition/state_machine/actions.rs | 5 +- .../worker/src/partition/state_machine/mod.rs | 424 +++++++++++++----- .../state_machine/tests/idempotency.rs | 80 ++-- .../state_machine/tests/kill_cancel.rs | 237 ++++++++-- .../partition/state_machine/tests/matchers.rs | 48 +- .../src/partition/state_machine/tests/mod.rs | 11 +- .../partition/state_machine/tests/workflow.rs | 26 +- 29 files changed, 927 insertions(+), 291 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 32dbd016c..c7eac74e3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -117,6 +117,24 @@ jobs: RUST_LOG=info,restate_invoker=trace,restate_ingress_http=trace,restate_bifrost=trace,restate_log_server=trace,restate_core::partitions=trace,restate=debug testArtifactOutput: sdk-java-kafka-next-gen-integration-test-report + sdk-java-invocation-status-killed: + name: Run SDK-Java integration tests with InvocationStatusKilled + permissions: + contents: read + issues: read + checks: write + pull-requests: write + actions: read + secrets: inherit + needs: docker + uses: restatedev/sdk-java/.github/workflows/integration.yaml@main + with: + restateCommit: ${{ github.event.pull_request.head.sha || github.sha }} + envVars: | + RESTATE_WORKER__EXPERIMENTAL_FEATURE_INVOCATION_STATUS_KILLED=true + RUST_LOG=info,restate_invoker=trace,restate_ingress_http=trace,restate_bifrost=trace,restate_log_server=trace,restate_core::partitions=trace,restate=debug + testArtifactOutput: sdk-java-invocation-status-killed-integration-test-report + sdk-python: name: Run SDK-Python integration tests permissions: diff --git a/Cargo.lock b/Cargo.lock index 6d9f95ca9..3123bb043 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7084,6 +7084,7 @@ dependencies = [ "codederror", "derive_builder", "derive_more", + "enumset", "futures", "googletest", "humantime", diff --git a/cli/src/clients/datafusion_helpers.rs b/cli/src/clients/datafusion_helpers.rs index c6e455c06..d309841d8 100644 --- a/cli/src/clients/datafusion_helpers.rs +++ b/cli/src/clients/datafusion_helpers.rs @@ -128,6 +128,7 @@ pub enum InvocationState { Running, Suspended, BackingOff, + Killed, Completed, } @@ -142,6 +143,7 @@ impl FromStr for InvocationState { "suspended" => Self::Suspended, "backing-off" => Self::BackingOff, "completed" => Self::Completed, + "killed" => Self::Killed, _ => Self::Unknown, }) } @@ -157,6 +159,7 @@ impl Display for InvocationState { InvocationState::Running => write!(f, "running"), InvocationState::Suspended => write!(f, "suspended"), InvocationState::BackingOff => write!(f, "backing-off"), + InvocationState::Killed => write!(f, "killed"), InvocationState::Completed => write!(f, "completed"), } } diff --git a/cli/src/ui/invocations.rs b/cli/src/ui/invocations.rs index e3293bb6a..5f93d0705 100644 --- a/cli/src/ui/invocations.rs +++ b/cli/src/ui/invocations.rs @@ -113,6 +113,7 @@ pub fn invocation_status_style(status: InvocationState) -> Style { InvocationState::Suspended => DStyle::new().dim(), InvocationState::BackingOff => DStyle::new().red(), InvocationState::Completed => DStyle::new().blue(), + InvocationState::Killed => DStyle::new().red(), } } diff --git a/crates/invoker-api/src/handle.rs b/crates/invoker-api/src/handle.rs index 32fdda035..b713ad72e 100644 --- a/crates/invoker-api/src/handle.rs +++ b/crates/invoker-api/src/handle.rs @@ -60,6 +60,8 @@ pub trait InvokerHandle { &mut self, partition_leader_epoch: PartitionLeaderEpoch, invocation_id: InvocationId, + // If true, acknowledge the abort. This will generate a Failed effect + acknowledge: bool, ) -> impl Future> + Send; fn register_partition( diff --git a/crates/invoker-api/src/lib.rs b/crates/invoker-api/src/lib.rs index 1a3be4993..a0067edc8 100644 --- a/crates/invoker-api/src/lib.rs +++ b/crates/invoker-api/src/lib.rs @@ -128,6 +128,7 @@ pub mod test_util { &mut self, _partition_leader_epoch: PartitionLeaderEpoch, _invocation_id: InvocationId, + _acknowledge: bool, ) -> Result<(), NotRunningError> { Ok(()) } diff --git a/crates/invoker-impl/src/input_command.rs b/crates/invoker-impl/src/input_command.rs index 0d82a5ef9..89b006f9f 100644 --- a/crates/invoker-impl/src/input_command.rs +++ b/crates/invoker-impl/src/input_command.rs @@ -45,6 +45,7 @@ pub(crate) enum InputCommand { Abort { partition: PartitionLeaderEpoch, invocation_id: InvocationId, + acknowledge: bool, }, /// Command used to clean up internal state when a partition leader is going away @@ -129,11 +130,13 @@ impl restate_invoker_api::InvokerHandle for InvokerHandle { &mut self, partition: PartitionLeaderEpoch, invocation_id: InvocationId, + acknowledge: bool, ) -> Result<(), NotRunningError> { self.input .send(InputCommand::Abort { partition, invocation_id, + acknowledge, }) .map_err(|_| NotRunningError) } diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 960535e9f..30185806c 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -61,6 +61,7 @@ pub use input_command::ChannelStatusReader; pub use input_command::InvokerHandle; use restate_service_client::{AssumeRoleCacheMode, ServiceClient}; use restate_types::deployment::PinnedDeployment; +use restate_types::errors::KILLED_INVOCATION_ERROR; use restate_types::invocation::InvocationTarget; use restate_types::schema::service::ServiceMetadataResolver; @@ -351,8 +352,8 @@ where self.handle_register_partition(partition, partition_key_range, storage_reader, sender); }, - InputCommand::Abort { partition, invocation_id } => { - self.handle_abort_invocation(partition, invocation_id); + InputCommand::Abort { partition, invocation_id, acknowledge } => { + self.handle_abort_invocation(partition, invocation_id, acknowledge).await; } InputCommand::AbortAllPartition { partition } => { self.handle_abort_partition(partition); @@ -808,12 +809,13 @@ where restate.invoker.partition_leader_epoch = ?partition, ) )] - fn handle_abort_invocation( + async fn handle_abort_invocation( &mut self, partition: PartitionLeaderEpoch, invocation_id: InvocationId, + acknowledge: bool, ) { - if let Some((_, _, mut ism)) = self + if let Some((tx, _, mut ism)) = self .invocation_state_machine_manager .remove_invocation(partition, &invocation_id) { @@ -823,6 +825,14 @@ where ism.abort(); self.quota.unreserve_slot(); self.status_store.on_end(&partition, &invocation_id); + if acknowledge { + let _ = tx + .send(Effect { + invocation_id, + kind: EffectKind::Failed(KILLED_INVOCATION_ERROR), + }) + .await; + } } else { trace!("Ignoring Abort command because there is no matching partition/invocation"); } @@ -1413,7 +1423,9 @@ mod tests { assert_eq!(*available_slots, 1); // Abort the invocation - service_inner.handle_abort_invocation(MOCK_PARTITION, invocation_id); + service_inner + .handle_abort_invocation(MOCK_PARTITION, invocation_id, false) + .await; // Check the quota let_assert!(InvokerConcurrencyQuota::Limited { available_slots } = &service_inner.quota); diff --git a/crates/partition-store/src/invocation_status_table/mod.rs b/crates/partition-store/src/invocation_status_table/mod.rs index a1112c814..be134fbbf 100644 --- a/crates/partition-store/src/invocation_status_table/mod.rs +++ b/crates/partition-store/src/invocation_status_table/mod.rs @@ -17,11 +17,11 @@ use futures::Stream; use futures_util::stream; use restate_rocksdb::RocksDbPerfGuard; use restate_storage_api::invocation_status_table::{ - InvocationStatus, InvocationStatusTable, InvocationStatusV1, ReadOnlyInvocationStatusTable, + InvocationStatus, InvocationStatusTable, InvocationStatusV1, + InvokedOrKilledInvocationStatusLite, ReadOnlyInvocationStatusTable, }; use restate_storage_api::{Result, StorageError}; use restate_types::identifiers::{InvocationId, InvocationUuid, PartitionKey, WithPartitionKey}; -use restate_types::invocation::InvocationTarget; use restate_types::storage::StorageCodec; use std::ops::RangeInclusive; use tracing::trace; @@ -166,10 +166,10 @@ fn delete_invocation_status(storage: &mut S, invocation_id: &I storage.delete_key(&create_invocation_status_key(invocation_id)); } -fn invoked_invocations( +fn invoked_or_killed_invocations( storage: &mut S, partition_key_range: RangeInclusive, -) -> Vec> { +) -> Vec> { let _x = RocksDbPerfGuard::new("invoked-invocations"); let mut invocations = storage.for_each_key_value_in_place( FullScanPartitionKeyRange::(partition_key_range.clone()), @@ -185,7 +185,7 @@ fn invoked_invocations( invocations.extend(storage.for_each_key_value_in_place( FullScanPartitionKeyRange::(partition_key_range), |mut k, mut v| { - let result = read_invoked_full_invocation_id(&mut k, &mut v).transpose(); + let result = read_invoked_or_killed_status_lite(&mut k, &mut v).transpose(); if let Some(res) = result { TableScanIterationDecision::Emit(res) } else { @@ -239,27 +239,41 @@ fn all_invocation_status( fn read_invoked_v1_full_invocation_id( mut k: &mut &[u8], v: &mut &[u8], -) -> Result> { +) -> Result> { let invocation_id = invocation_id_from_v1_key_bytes(&mut k)?; let invocation_status = StorageCodec::decode::(v) .map_err(|err| StorageError::Generic(err.into()))?; if let InvocationStatus::Invoked(invocation_meta) = invocation_status.0 { - Ok(Some((invocation_id, invocation_meta.invocation_target))) + Ok(Some(InvokedOrKilledInvocationStatusLite { + invocation_id, + invocation_target: invocation_meta.invocation_target, + is_invoked: true, + })) } else { Ok(None) } } -fn read_invoked_full_invocation_id( +fn read_invoked_or_killed_status_lite( mut k: &mut &[u8], v: &mut &[u8], -) -> Result> { +) -> Result> { // TODO this can be improved by simply parsing InvocationTarget and the Status enum let invocation_id = invocation_id_from_key_bytes(&mut k)?; let invocation_status = StorageCodec::decode::(v) .map_err(|err| StorageError::Generic(err.into()))?; if let InvocationStatus::Invoked(invocation_meta) = invocation_status { - Ok(Some((invocation_id, invocation_meta.invocation_target))) + Ok(Some(InvokedOrKilledInvocationStatusLite { + invocation_id, + invocation_target: invocation_meta.invocation_target, + is_invoked: true, + })) + } else if let InvocationStatus::Killed(invocation_meta) = invocation_status { + Ok(Some(InvokedOrKilledInvocationStatusLite { + invocation_id, + invocation_target: invocation_meta.invocation_target, + is_invoked: false, + })) } else { Ok(None) } @@ -274,10 +288,10 @@ impl ReadOnlyInvocationStatusTable for PartitionStore { get_invocation_status(self, invocation_id) } - fn all_invoked_invocations( + fn all_invoked_or_killed_invocations( &mut self, - ) -> impl Stream> + Send { - stream::iter(invoked_invocations( + ) -> impl Stream> + Send { + stream::iter(invoked_or_killed_invocations( self, self.partition_key_range().clone(), )) @@ -300,10 +314,10 @@ impl<'a> ReadOnlyInvocationStatusTable for PartitionStoreTransaction<'a> { try_migrate_and_get_invocation_status(self, invocation_id) } - fn all_invoked_invocations( + fn all_invoked_or_killed_invocations( &mut self, - ) -> impl Stream> + Send { - stream::iter(invoked_invocations( + ) -> impl Stream> + Send { + stream::iter(invoked_or_killed_invocations( self, self.partition_key_range().clone(), )) diff --git a/crates/partition-store/src/tests/invocation_status_table_test/mod.rs b/crates/partition-store/src/tests/invocation_status_table_test/mod.rs index aadd219e4..7da572ea7 100644 --- a/crates/partition-store/src/tests/invocation_status_table_test/mod.rs +++ b/crates/partition-store/src/tests/invocation_status_table_test/mod.rs @@ -22,7 +22,8 @@ use googletest::prelude::*; use once_cell::sync::Lazy; use restate_storage_api::invocation_status_table::{ InFlightInvocationMetadata, InvocationStatus, InvocationStatusTable, InvocationStatusV1, - JournalMetadata, ReadOnlyInvocationStatusTable, StatusTimestamps, + InvokedOrKilledInvocationStatusLite, JournalMetadata, ReadOnlyInvocationStatusTable, + StatusTimestamps, }; use restate_storage_api::Transaction; use restate_types::identifiers::{InvocationId, PartitionProcessorRpcRequestId, WithPartitionKey}; @@ -94,6 +95,19 @@ fn invoked_status(invocation_target: InvocationTarget) -> InvocationStatus { }) } +fn killed_status(invocation_target: InvocationTarget) -> InvocationStatus { + InvocationStatus::Killed(InFlightInvocationMetadata { + invocation_target, + journal_metadata: JournalMetadata::initialize(ServiceInvocationSpanContext::empty()), + pinned_deployment: None, + response_sinks: HashSet::new(), + timestamps: StatusTimestamps::init(MillisSinceEpoch::new(0)), + source: Source::Ingress(*RPC_REQUEST_ID), + completion_retention_duration: Duration::ZERO, + idempotency_key: None, + }) +} + fn suspended_status(invocation_target: InvocationTarget) -> InvocationStatus { InvocationStatus::Suspended { metadata: InFlightInvocationMetadata { @@ -131,7 +145,7 @@ async fn populate_data(txn: &mut T) { txn.put_invocation_status( &INVOCATION_ID_4, - &invoked_status(INVOCATION_TARGET_4.clone()), + &killed_status(INVOCATION_TARGET_4.clone()), ) .await; @@ -154,22 +168,34 @@ async fn verify_point_lookups(txn: &mut T) { txn.get_invocation_status(&INVOCATION_ID_4) .await .expect("should not fail"), - invoked_status(INVOCATION_TARGET_4.clone()) + killed_status(INVOCATION_TARGET_4.clone()) ); } -async fn verify_all_svc_with_status_invoked(txn: &mut T) { +async fn verify_all_svc_with_status_invoked_or_killed(txn: &mut T) { let actual = txn - .all_invoked_invocations() + .all_invoked_or_killed_invocations() .try_collect::>() .await .unwrap(); assert_that!( actual, unordered_elements_are![ - eq((*INVOCATION_ID_1, INVOCATION_TARGET_1.clone())), - eq((*INVOCATION_ID_2, INVOCATION_TARGET_2.clone())), - eq((*INVOCATION_ID_4, INVOCATION_TARGET_4.clone())) + eq(InvokedOrKilledInvocationStatusLite { + invocation_id: *INVOCATION_ID_1, + invocation_target: INVOCATION_TARGET_1.clone(), + is_invoked: true, + }), + eq(InvokedOrKilledInvocationStatusLite { + invocation_id: *INVOCATION_ID_2, + invocation_target: INVOCATION_TARGET_2.clone(), + is_invoked: true, + }), + eq(InvokedOrKilledInvocationStatusLite { + invocation_id: *INVOCATION_ID_4, + invocation_target: INVOCATION_TARGET_4.clone(), + is_invoked: false, + }), ] ); } @@ -181,7 +207,7 @@ async fn test_invocation_status() { populate_data(&mut txn).await; verify_point_lookups(&mut txn).await; - verify_all_svc_with_status_invoked(&mut txn).await; + verify_all_svc_with_status_invoked_or_killed(&mut txn).await; } #[restate_core::test(flavor = "multi_thread", worker_threads = 2)] diff --git a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto index 7d793769f..3083aaf25 100644 --- a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto +++ b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto @@ -119,6 +119,7 @@ message InvocationStatusV2 { INBOXED = 2; INVOKED = 3; SUSPENDED = 4; + KILLED = 6; COMPLETED = 5; } diff --git a/crates/storage-api/src/invocation_status_table/mod.rs b/crates/storage-api/src/invocation_status_table/mod.rs index 1403885f4..677112f3d 100644 --- a/crates/storage-api/src/invocation_status_table/mod.rs +++ b/crates/storage-api/src/invocation_status_table/mod.rs @@ -189,6 +189,7 @@ pub enum InvocationStatus { metadata: InFlightInvocationMetadata, waiting_for_completed_entries: HashSet, }, + Killed(InFlightInvocationMetadata), Completed(CompletedInvocation), /// Service instance is currently not invoked #[default] @@ -203,6 +204,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => Some(&metadata.metadata.invocation_target), InvocationStatus::Invoked(metadata) => Some(&metadata.invocation_target), InvocationStatus::Suspended { metadata, .. } => Some(&metadata.invocation_target), + InvocationStatus::Killed(metadata) => Some(&metadata.invocation_target), InvocationStatus::Completed(completed) => Some(&completed.invocation_target), _ => None, } @@ -215,6 +217,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => Some(&metadata.metadata.source), InvocationStatus::Invoked(metadata) => Some(&metadata.source), InvocationStatus::Suspended { metadata, .. } => Some(&metadata.source), + InvocationStatus::Killed(metadata) => Some(&metadata.source), InvocationStatus::Completed(completed) => Some(&completed.source), _ => None, } @@ -227,6 +230,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => metadata.metadata.idempotency_key.as_ref(), InvocationStatus::Invoked(metadata) => metadata.idempotency_key.as_ref(), InvocationStatus::Suspended { metadata, .. } => metadata.idempotency_key.as_ref(), + InvocationStatus::Killed(metadata) => metadata.idempotency_key.as_ref(), InvocationStatus::Completed(completed) => completed.idempotency_key.as_ref(), _ => None, } @@ -237,6 +241,7 @@ impl InvocationStatus { match self { InvocationStatus::Invoked(metadata) => Some(metadata.journal_metadata), InvocationStatus::Suspended { metadata, .. } => Some(metadata.journal_metadata), + InvocationStatus::Killed(metadata) => Some(metadata.journal_metadata), _ => None, } } @@ -246,6 +251,7 @@ impl InvocationStatus { match self { InvocationStatus::Invoked(metadata) => Some(&metadata.journal_metadata), InvocationStatus::Suspended { metadata, .. } => Some(&metadata.journal_metadata), + InvocationStatus::Killed(metadata) => Some(&metadata.journal_metadata), _ => None, } } @@ -255,6 +261,7 @@ impl InvocationStatus { match self { InvocationStatus::Invoked(metadata) => Some(&mut metadata.journal_metadata), InvocationStatus::Suspended { metadata, .. } => Some(&mut metadata.journal_metadata), + InvocationStatus::Killed(metadata) => Some(&mut metadata.journal_metadata), _ => None, } } @@ -264,6 +271,7 @@ impl InvocationStatus { match self { InvocationStatus::Invoked(metadata) => Some(metadata), InvocationStatus::Suspended { metadata, .. } => Some(metadata), + InvocationStatus::Killed(metadata) => Some(metadata), _ => None, } } @@ -273,6 +281,7 @@ impl InvocationStatus { match self { InvocationStatus::Invoked(metadata) => Some(metadata), InvocationStatus::Suspended { metadata, .. } => Some(metadata), + InvocationStatus::Killed(metadata) => Some(metadata), _ => None, } } @@ -282,6 +291,7 @@ impl InvocationStatus { match self { InvocationStatus::Invoked(metadata) => Some(metadata), InvocationStatus::Suspended { metadata, .. } => Some(metadata), + InvocationStatus::Killed(metadata) => Some(metadata), _ => None, } } @@ -295,6 +305,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => Some(&mut metadata.metadata.response_sinks), InvocationStatus::Invoked(metadata) => Some(&mut metadata.response_sinks), InvocationStatus::Suspended { metadata, .. } => Some(&mut metadata.response_sinks), + InvocationStatus::Killed(metadata) => Some(&mut metadata.response_sinks), _ => None, } } @@ -306,6 +317,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => Some(&metadata.metadata.response_sinks), InvocationStatus::Invoked(metadata) => Some(&metadata.response_sinks), InvocationStatus::Suspended { metadata, .. } => Some(&metadata.response_sinks), + InvocationStatus::Killed(metadata) => Some(&metadata.response_sinks), _ => None, } } @@ -317,6 +329,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => Some(&metadata.metadata.timestamps), InvocationStatus::Invoked(metadata) => Some(&metadata.timestamps), InvocationStatus::Suspended { metadata, .. } => Some(&metadata.timestamps), + InvocationStatus::Killed(metadata) => Some(&metadata.timestamps), InvocationStatus::Completed(completed) => Some(&completed.timestamps), _ => None, } @@ -329,6 +342,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => Some(&mut metadata.metadata.timestamps), InvocationStatus::Invoked(metadata) => Some(&mut metadata.timestamps), InvocationStatus::Suspended { metadata, .. } => Some(&mut metadata.timestamps), + InvocationStatus::Killed(metadata) => Some(&mut metadata.timestamps), InvocationStatus::Completed(completed) => Some(&mut completed.timestamps), _ => None, } @@ -550,15 +564,23 @@ impl CompletedInvocation { } } +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct InvokedOrKilledInvocationStatusLite { + pub invocation_id: InvocationId, + pub invocation_target: InvocationTarget, + /// If true, original status is Invoked, otherwise is Killed + pub is_invoked: bool, +} + pub trait ReadOnlyInvocationStatusTable { fn get_invocation_status( &mut self, invocation_id: &InvocationId, ) -> impl Future> + Send; - fn all_invoked_invocations( + fn all_invoked_or_killed_invocations( &mut self, - ) -> impl Stream> + Send; + ) -> impl Stream> + Send; fn all_invocation_statuses( &self, diff --git a/crates/storage-api/src/storage.rs b/crates/storage-api/src/storage.rs index 236d6ab00..7d609df1a 100644 --- a/crates/storage-api/src/storage.rs +++ b/crates/storage-api/src/storage.rs @@ -471,6 +471,28 @@ pub mod v1 { .collect(), }, ), + invocation_status_v2::Status::Killed => { + Ok(crate::invocation_status_table::InvocationStatus::Killed( + crate::invocation_status_table::InFlightInvocationMetadata { + response_sinks, + timestamps, + invocation_target, + journal_metadata: crate::invocation_status_table::JournalMetadata { + length: journal_length, + span_context: expect_or_fail!(span_context)?.try_into()?, + }, + pinned_deployment: derive_pinned_deployment( + deployment_id, + service_protocol_version, + )?, + source, + completion_retention_duration: completion_retention_duration + .unwrap_or_default() + .try_into()?, + idempotency_key: idempotency_key.map(ByteString::from), + }, + )) + } invocation_status_v2::Status::Completed => { Ok(crate::invocation_status_table::InvocationStatus::Completed( crate::invocation_status_table::CompletedInvocation { @@ -518,6 +540,7 @@ pub mod v1 { invocation_target: Some(invocation_target.into()), source: Some(source.into()), span_context: Some(span_context.into()), + // SAFETY: We're only mapping data types here creation_time: unsafe { timestamps.creation_time() }.as_u64(), modification_time: unsafe { timestamps.modification_time() }.as_u64(), inboxed_transition_time: unsafe { timestamps.inboxed_transition_time() } @@ -570,6 +593,7 @@ pub mod v1 { invocation_target: Some(invocation_target.into()), source: Some(source.into()), span_context: Some(span_context.into()), + // SAFETY: We're only mapping data types here creation_time: unsafe { timestamps.creation_time() }.as_u64(), modification_time: unsafe { timestamps.modification_time() }.as_u64(), inboxed_transition_time: unsafe { timestamps.inboxed_transition_time() } @@ -625,6 +649,7 @@ pub mod v1 { invocation_target: Some(invocation_target.into()), source: Some(source.into()), span_context: Some(journal_metadata.span_context.into()), + // SAFETY: We're only mapping data types here creation_time: unsafe { timestamps.creation_time() }.as_u64(), modification_time: unsafe { timestamps.modification_time() }.as_u64(), inboxed_transition_time: unsafe { @@ -689,6 +714,7 @@ pub mod v1 { invocation_target: Some(invocation_target.into()), source: Some(source.into()), span_context: Some(journal_metadata.span_context.into()), + // SAFETY: We're only mapping data types here creation_time: unsafe { timestamps.creation_time() }.as_u64(), modification_time: unsafe { timestamps.modification_time() }.as_u64(), inboxed_transition_time: unsafe { @@ -728,6 +754,69 @@ pub mod v1 { result: None, } } + crate::invocation_status_table::InvocationStatus::Killed( + crate::invocation_status_table::InFlightInvocationMetadata { + invocation_target, + journal_metadata, + pinned_deployment, + response_sinks, + timestamps, + source, + completion_retention_duration, + idempotency_key, + }, + ) => { + let (deployment_id, service_protocol_version) = match pinned_deployment { + None => (None, None), + Some(pinned_deployment) => ( + Some(pinned_deployment.deployment_id.to_string()), + Some(pinned_deployment.service_protocol_version.as_repr()), + ), + }; + + InvocationStatusV2 { + status: invocation_status_v2::Status::Killed.into(), + invocation_target: Some(invocation_target.into()), + source: Some(source.into()), + span_context: Some(journal_metadata.span_context.into()), + // SAFETY: We're only mapping data types here + creation_time: unsafe { timestamps.creation_time() }.as_u64(), + modification_time: unsafe { timestamps.modification_time() }.as_u64(), + inboxed_transition_time: unsafe { + timestamps.inboxed_transition_time() + } + .map(|t| t.as_u64()), + scheduled_transition_time: unsafe { + timestamps.scheduled_transition_time() + } + .map(|t| t.as_u64()), + running_transition_time: unsafe { + timestamps.running_transition_time() + } + .map(|t| t.as_u64()), + completed_transition_time: unsafe { + timestamps.completed_transition_time() + } + .map(|t| t.as_u64()), + response_sinks: response_sinks + .into_iter() + .map(|s| ServiceInvocationResponseSink::from(Some(s))) + .collect(), + argument: None, + headers: vec![], + execution_time: None, + completion_retention_duration: Some( + completion_retention_duration.into(), + ), + idempotency_key: idempotency_key.map(|key| key.to_string()), + inbox_sequence_number: None, + journal_length: journal_metadata.length, + deployment_id, + service_protocol_version, + waiting_for_completed_entries: vec![], + result: None, + } + } crate::invocation_status_table::InvocationStatus::Completed( crate::invocation_status_table::CompletedInvocation { invocation_target, @@ -743,6 +832,7 @@ pub mod v1 { invocation_target: Some(invocation_target.into()), source: Some(source.into()), span_context: Some(span_context.into()), + // SAFETY: We're only mapping data types here creation_time: unsafe { timestamps.creation_time() }.as_u64(), modification_time: unsafe { timestamps.modification_time() }.as_u64(), inboxed_transition_time: unsafe { timestamps.inboxed_transition_time() } @@ -856,6 +946,9 @@ pub mod v1 { crate::invocation_status_table::InvocationStatus::Scheduled(_) => { panic!("Unexpected conversion to old InvocationStatus when using Scheduled variant. This is a bug in the table implementation.") } + crate::invocation_status_table::InvocationStatus::Killed(_) => { + panic!("Unexpected conversion to old InvocationStatus when using Killed variant. This is a bug in the table implementation.") + } }; InvocationStatus { @@ -966,6 +1059,7 @@ pub mod v1 { source, completion_retention_duration: completion_retention_time, idempotency_key, + .. } = value; let (deployment_id, service_protocol_version) = match pinned_deployment { diff --git a/crates/storage-api/src/timer_table/mod.rs b/crates/storage-api/src/timer_table/mod.rs index b9f65dcc7..da64a68c3 100644 --- a/crates/storage-api/src/timer_table/mod.rs +++ b/crates/storage-api/src/timer_table/mod.rs @@ -49,7 +49,7 @@ impl TimerKey { } } - fn neo_invoke(timestamp: u64, invocation_uuid: InvocationUuid) -> Self { + pub fn neo_invoke(timestamp: u64, invocation_uuid: InvocationUuid) -> Self { TimerKey { timestamp, kind: TimerKeyKind::NeoInvoke { invocation_uuid }, diff --git a/crates/storage-query-datafusion/src/invocation_status/row.rs b/crates/storage-query-datafusion/src/invocation_status/row.rs index 5d68ec891..196bdfea8 100644 --- a/crates/storage-query-datafusion/src/invocation_status/row.rs +++ b/crates/storage-query-datafusion/src/invocation_status/row.rs @@ -83,6 +83,10 @@ pub(crate) fn append_invocation_status_row( row.status("suspended"); fill_in_flight_invocation_metadata(&mut row, output, metadata); } + InvocationStatus::Killed(metadata) => { + row.status("killed"); + fill_in_flight_invocation_metadata(&mut row, output, metadata); + } InvocationStatus::Free => { row.status("free"); } diff --git a/crates/storage-query-datafusion/src/invocation_status/schema.rs b/crates/storage-query-datafusion/src/invocation_status/schema.rs index 823939f0e..0d6dd616b 100644 --- a/crates/storage-query-datafusion/src/invocation_status/schema.rs +++ b/crates/storage-query-datafusion/src/invocation_status/schema.rs @@ -21,7 +21,7 @@ define_table!(sys_invocation_status( /// [Invocation ID](/operate/invocation#invocation-identifier). id: DataType::LargeUtf8, - /// Either `inboxed` or `scheduled` or `invoked` or `suspended` or `completed` + /// Either `inboxed` or `scheduled` or `invoked` or `suspended` or `killed` or `completed` status: DataType::LargeUtf8, /// If `status = 'completed'`, this contains either `success` or `failure` diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index 04b88076f..26942fcf3 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -51,6 +51,9 @@ pub struct WorkerOptions { #[cfg_attr(feature = "schemars", schemars(skip))] experimental_feature_disable_idempotency_table: bool, + #[cfg_attr(feature = "schemars", schemars(skip))] + experimental_feature_invocation_status_killed: bool, + pub storage: StorageOptions, pub invoker: InvokerOptions, @@ -88,6 +91,10 @@ impl WorkerOptions { pub fn experimental_feature_disable_idempotency_table(&self) -> bool { self.experimental_feature_disable_idempotency_table } + + pub fn experimental_feature_invocation_status_killed(&self) -> bool { + self.experimental_feature_invocation_status_killed + } } impl Default for WorkerOptions { @@ -97,6 +104,7 @@ impl Default for WorkerOptions { num_timers_in_memory_limit: None, cleanup_interval: Duration::from_secs(60 * 60).into(), experimental_feature_disable_idempotency_table: false, + experimental_feature_invocation_status_killed: false, storage: StorageOptions::default(), invoker: Default::default(), max_command_batch_size: NonZeroUsize::new(4).expect("Non zero number"), diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 89c85ee4d..8a7eadde0 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -54,6 +54,7 @@ codederror = { workspace = true } derive_builder = { workspace = true } derive_more = { workspace = true } futures = { workspace = true } +enumset = { workspace = true } humantime = { workspace = true } itertools = { workspace = true } metrics = { workspace = true } diff --git a/crates/worker/src/partition/cleaner.rs b/crates/worker/src/partition/cleaner.rs index 2bea1b639..884a22866 100644 --- a/crates/worker/src/partition/cleaner.rs +++ b/crates/worker/src/partition/cleaner.rs @@ -171,9 +171,9 @@ mod tests { use restate_core::{Metadata, TaskCenter, TaskKind, TestCoreEnvBuilder}; use restate_storage_api::invocation_status_table::{ CompletedInvocation, InFlightInvocationMetadata, InvocationStatus, + InvokedOrKilledInvocationStatusLite, }; use restate_types::identifiers::{InvocationId, InvocationUuid}; - use restate_types::invocation::InvocationTarget; use restate_types::partition_table::{FindPartition, PartitionTable}; use restate_types::Version; use std::future::Future; @@ -192,9 +192,9 @@ mod tests { std::future::pending() } - fn all_invoked_invocations( + fn all_invoked_or_killed_invocations( &mut self, - ) -> impl Stream> + Send + ) -> impl Stream> + Send { todo!(); #[allow(unreachable_code)] diff --git a/crates/worker/src/partition/leadership/leader_state.rs b/crates/worker/src/partition/leadership/leader_state.rs index 2ab1e9815..bea89fdbf 100644 --- a/crates/worker/src/partition/leadership/leader_state.rs +++ b/crates/worker/src/partition/leadership/leader_state.rs @@ -11,7 +11,7 @@ use crate::metric_definitions::{PARTITION_ACTUATOR_HANDLED, PARTITION_HANDLE_LEADER_ACTIONS}; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::leadership::self_proposer::SelfProposer; -use crate::partition::leadership::{ActionEffect, Error, TimerService}; +use crate::partition::leadership::{ActionEffect, Error, InvokerStream, TimerService}; use crate::partition::shuffle::HintSender; use crate::partition::state_machine::Action; use crate::partition::{respond_to_rpc, shuffle}; @@ -67,7 +67,7 @@ pub struct LeaderState { >, awaiting_rpc_self_propose: FuturesUnordered, - invoker_stream: ReceiverStream, + invoker_stream: InvokerStream, shuffle_stream: ReceiverStream, pub pending_cleanup_timers_to_schedule: VecDeque<(InvocationId, Duration)>, cleaner_task_id: TaskId, @@ -84,7 +84,7 @@ impl LeaderState { shuffle_hint_tx: HintSender, timer_service: TimerService, self_proposer: SelfProposer, - invoker_rx: tokio::sync::mpsc::Receiver, + invoker_rx: InvokerStream, shuffle_rx: tokio::sync::mpsc::Receiver, ) -> Self { LeaderState { @@ -99,7 +99,7 @@ impl LeaderState { self_proposer, awaiting_rpc_actions: Default::default(), awaiting_rpc_self_propose: Default::default(), - invoker_stream: ReceiverStream::new(invoker_rx), + invoker_stream: invoker_rx, shuffle_stream: ReceiverStream::new(shuffle_rx), pending_cleanup_timers_to_schedule: Default::default(), } @@ -392,8 +392,11 @@ impl LeaderState { .notify_completion(partition_leader_epoch, invocation_id, completion) .await .map_err(Error::Invoker)?, - Action::AbortInvocation(invocation_id) => invoker_tx - .abort_invocation(partition_leader_epoch, invocation_id) + Action::AbortInvocation { + invocation_id, + acknowledge, + } => invoker_tx + .abort_invocation(partition_leader_epoch, invocation_id, acknowledge) .await .map_err(Error::Invoker)?, Action::IngressResponse { diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 47a9377d7..105e5a6a4 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -17,8 +17,9 @@ use std::mem; use std::ops::RangeInclusive; use std::time::Duration; -use futures::{StreamExt, TryStreamExt}; +use futures::{stream, StreamExt, TryStreamExt}; use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, instrument, warn}; use restate_bifrost::Bifrost; @@ -28,11 +29,13 @@ use restate_errors::NotRunningError; use restate_invoker_api::InvokeInputJournal; use restate_partition_store::PartitionStore; use restate_storage_api::deduplication_table::EpochSequenceNumber; -use restate_storage_api::invocation_status_table::ReadOnlyInvocationStatusTable; +use restate_storage_api::invocation_status_table::{ + InvokedOrKilledInvocationStatusLite, ReadOnlyInvocationStatusTable, +}; use restate_storage_api::outbox_table::{OutboxMessage, OutboxTable}; use restate_storage_api::timer_table::{TimerKey, TimerTable}; use restate_timer::TokioClock; -use restate_types::errors::GenericError; +use restate_types::errors::{GenericError, KILLED_INVOCATION_ERROR}; use restate_types::identifiers::{InvocationId, PartitionKey, PartitionProcessorRpcRequestId}; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionLeaderEpoch}; use restate_types::message::MessageIndex; @@ -50,9 +53,12 @@ use crate::partition::leadership::leader_state::LeaderState; use crate::partition::leadership::self_proposer::SelfProposer; use crate::partition::shuffle::{OutboxReaderError, Shuffle, ShuffleMetadata}; use crate::partition::state_machine::Action; +use crate::partition::types::{InvokerEffect, InvokerEffectKind}; use crate::partition::{respond_to_rpc, shuffle}; type TimerService = restate_timer::TimerService; +type InvokerStream = + stream::Chain>, ReceiverStream>; #[derive(Debug, thiserror::Error)] pub(crate) enum Error { @@ -385,9 +391,11 @@ where partition_key_range: RangeInclusive, partition_store: &mut PartitionStore, channel_size: usize, - ) -> Result, Error> { + ) -> Result { let (invoker_tx, invoker_rx) = mpsc::channel(channel_size); + let mut killed_invocations_effects = vec![]; + invoker_handle .register_partition( partition_leader_epoch, @@ -399,27 +407,45 @@ where .map_err(Error::Invoker)?; { - let invoked_invocations = partition_store.all_invoked_invocations(); - tokio::pin!(invoked_invocations); + let invoked_or_killed_invocations = partition_store.all_invoked_or_killed_invocations(); + tokio::pin!(invoked_or_killed_invocations); let mut count = 0; - while let Some(invocation_id_and_target) = invoked_invocations.next().await { - let (invocation_id, invocation_target) = invocation_id_and_target?; - invoker_handle - .invoke( - partition_leader_epoch, + while let Some(invoked_or_killed_invocation) = + invoked_or_killed_invocations.next().await + { + let InvokedOrKilledInvocationStatusLite { + invocation_id, + invocation_target, + is_invoked, + } = invoked_or_killed_invocation?; + if is_invoked { + invoker_handle + .invoke( + partition_leader_epoch, + invocation_id, + invocation_target, + InvokeInputJournal::NoCachedJournal, + ) + .await + .map_err(Error::Invoker)?; + } else { + // For killed invocations, there's no need to go through the invoker + // We simply return here the effect as if the invoker produced that. + killed_invocations_effects.push(InvokerEffect { invocation_id, - invocation_target, - InvokeInputJournal::NoCachedJournal, - ) - .await - .map_err(Error::Invoker)?; + kind: InvokerEffectKind::Failed(KILLED_INVOCATION_ERROR), + }); + } count += 1; } debug!("Leader partition resumed {} invocations", count); } - Ok(invoker_rx) + Ok( + futures::stream::iter(killed_invocations_effects) + .chain(ReceiverStream::new(invoker_rx)), + ) } async fn become_follower(&mut self) { diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index e9327bb03..571812edd 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -15,6 +15,7 @@ use std::time::{Duration, Instant}; use anyhow::Context; use assert2::let_assert; +use enumset::EnumSet; use futures::{FutureExt, Stream, StreamExt, TryStreamExt as _}; use metrics::histogram; use tokio::sync::{mpsc, watch}; @@ -41,6 +42,7 @@ use restate_storage_api::service_status_table::{ use restate_storage_api::{StorageError, Transaction}; use restate_types::cluster::cluster_state::{PartitionProcessorStatus, ReplayStatus, RunMode}; use restate_types::config::WorkerOptions; +use restate_types::errors::KILLED_INVOCATION_ERROR; use restate_types::identifiers::{ LeaderEpoch, PartitionId, PartitionKey, PartitionProcessorRpcRequestId, WithPartitionKey, }; @@ -68,7 +70,7 @@ use crate::metric_definitions::{ }; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::leadership::{LeadershipState, PartitionProcessorMetadata}; -use crate::partition::state_machine::{ActionCollector, StateMachine}; +use crate::partition::state_machine::{ActionCollector, ExperimentalFeature, StateMachine}; mod cleaner; pub mod invoker_storage_reader; @@ -91,6 +93,7 @@ pub(super) struct PartitionProcessorBuilder { num_timers_in_memory_limit: Option, disable_idempotency_table: bool, + invocation_status_killed: bool, cleanup_interval: Duration, channel_size: usize, max_command_batch_size: usize, @@ -124,6 +127,7 @@ where status, num_timers_in_memory_limit: options.num_timers_in_memory_limit(), disable_idempotency_table: options.experimental_feature_disable_idempotency_table(), + invocation_status_killed: options.experimental_feature_invocation_status_killed(), cleanup_interval: options.cleanup_interval(), channel_size: options.internal_queue_length(), max_command_batch_size: options.max_command_batch_size(), @@ -145,6 +149,7 @@ where num_timers_in_memory_limit, cleanup_interval, disable_idempotency_table, + invocation_status_killed, channel_size, max_command_batch_size, invoker_tx, @@ -159,6 +164,7 @@ where &mut partition_store, partition_key_range.clone(), disable_idempotency_table, + invocation_status_killed, ) .await?; @@ -202,6 +208,7 @@ where partition_store: &mut PartitionStore, partition_key_range: RangeInclusive, disable_idempotency_table: bool, + invocation_status_killed: bool, ) -> Result, StorageError> where Codec: RawEntryCodec + Default + Debug, @@ -210,12 +217,20 @@ where let outbox_seq_number = partition_store.get_outbox_seq_number().await?; let outbox_head_seq_number = partition_store.get_outbox_head_seq_number().await?; + let mut experimental_features = EnumSet::empty(); + if disable_idempotency_table { + experimental_features |= ExperimentalFeature::DisableIdempotencyTable; + } + if invocation_status_killed { + experimental_features |= ExperimentalFeature::InvocationStatusKilled; + } + let state_machine = StateMachine::new( inbox_seq_number, outbox_seq_number, outbox_head_seq_number, partition_key_range, - disable_idempotency_table, + experimental_features, ); Ok(state_machine) @@ -681,6 +696,14 @@ where completion_expiry_time, })) } + InvocationStatus::Killed(_) => { + Ok(PartitionProcessorRpcResponse::Output(InvocationOutput { + request_id, + response: IngressResponseResult::Failure(KILLED_INVOCATION_ERROR), + invocation_id: Some(invocation_id), + completion_expiry_time: None, + })) + } _ => Ok(PartitionProcessorRpcResponse::NotReady), } } diff --git a/crates/worker/src/partition/state_machine/actions.rs b/crates/worker/src/partition/state_machine/actions.rs index ac3b3a53d..feefbc3de 100644 --- a/crates/worker/src/partition/state_machine/actions.rs +++ b/crates/worker/src/partition/state_machine/actions.rs @@ -47,7 +47,10 @@ pub enum Action { invocation_id: InvocationId, completion: Completion, }, - AbortInvocation(InvocationId), + AbortInvocation { + invocation_id: InvocationId, + acknowledge: bool, + }, IngressResponse { request_id: PartitionProcessorRpcRequestId, invocation_id: Option, diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index b4ae62f63..8c9906aaf 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -18,6 +18,7 @@ pub use actions::{Action, ActionCollector}; use assert2::let_assert; use bytes::Bytes; use bytestring::ByteString; +use enumset::EnumSet; use futures::{StreamExt, TryStreamExt}; use metrics::{histogram, Histogram}; use restate_invoker_api::InvokeInputJournal; @@ -45,10 +46,9 @@ use restate_storage_api::Result as StorageResult; use restate_tracing_instrumentation as instrumentation; use restate_types::deployment::PinnedDeployment; use restate_types::errors::{ - InvocationError, InvocationErrorCode, ALREADY_COMPLETED_INVOCATION_ERROR, - ATTACH_NOT_SUPPORTED_INVOCATION_ERROR, CANCELED_INVOCATION_ERROR, KILLED_INVOCATION_ERROR, - NOT_FOUND_INVOCATION_ERROR, NOT_READY_INVOCATION_ERROR, - WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR, + InvocationErrorCode, ALREADY_COMPLETED_INVOCATION_ERROR, ATTACH_NOT_SUPPORTED_INVOCATION_ERROR, + CANCELED_INVOCATION_ERROR, KILLED_INVOCATION_ERROR, NOT_FOUND_INVOCATION_ERROR, + NOT_READY_INVOCATION_ERROR, WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR, }; use restate_types::identifiers::{ EntryIndex, InvocationId, PartitionKey, PartitionProcessorRpcRequestId, ServiceId, @@ -90,6 +90,16 @@ use std::time::Instant; use tracing::error; use utils::SpanExt; +#[derive(Debug, Hash, enumset::EnumSetType, strum::Display)] +pub enum ExperimentalFeature { + /// This is used to disable writing to idempotency table/virtual object status table for idempotent invocations/workflow invocations. + /// From Restate 1.2 invocation ids are generated deterministically, so this additional index is not needed. + DisableIdempotencyTable, + /// If true, kill should wait for end signal from invoker, in order to implement the restart functionality. + /// This is enabled by experimental_feature_kill_and_restart. + InvocationStatusKilled, +} + pub struct StateMachine { // initialized from persistent storage inbox_seq_number: MessageIndex, @@ -100,9 +110,8 @@ pub struct StateMachine { partition_key_range: RangeInclusive, latency: Histogram, - /// This is used to disable writing to idempotency table/virtual object status table for idempotent invocations/workflow invocations. - /// From Restate 1.2 invocation ids are generated deterministically, so this additional index is not needed. - disable_idempotency_table: bool, + /// Enabled experimental features. + experimental_features: EnumSet, _codec: PhantomData, } @@ -160,7 +169,7 @@ impl StateMachine { outbox_seq_number: MessageIndex, outbox_head_seq_number: Option, partition_key_range: RangeInclusive, - disable_idempotency_table: bool, + experimental_features: EnumSet, ) -> Self { let latency = histogram!(crate::metric_definitions::PARTITION_HANDLE_INVOKER_EFFECT_COMMAND); @@ -170,7 +179,7 @@ impl StateMachine { outbox_head_seq_number, partition_key_range, latency, - disable_idempotency_table, + experimental_features, _codec: PhantomData, } } @@ -287,11 +296,11 @@ impl StateMachine { } Command::Timer(timer) => self.on_timer(&mut ctx, timer).await, Command::TerminateInvocation(invocation_termination) => { - self.try_terminate_invocation(&mut ctx, invocation_termination) + self.on_terminate_invocation(&mut ctx, invocation_termination) .await } Command::PurgeInvocation(purge_invocation_request) => { - self.try_purge_invocation(&mut ctx, purge_invocation_request.invocation_id) + self.on_purge_invocation(&mut ctx, purge_invocation_request.invocation_id) .await } Command::PatchState(mutation) => { @@ -482,7 +491,11 @@ impl StateMachine { // Store the invocation id mapping if we have to and continue the processing // TODO get rid of this code when we remove the usage of the virtual object table for workflows - if is_workflow_run && !self.disable_idempotency_table { + if is_workflow_run + && !self + .experimental_features + .contains(ExperimentalFeature::DisableIdempotencyTable) + { ctx.storage .put_virtual_object_status( &service_invocation @@ -494,7 +507,11 @@ impl StateMachine { .await; } // TODO get rid of this code when we remove the idempotency table - if has_idempotency_key && !self.disable_idempotency_table { + if has_idempotency_key + && !self + .experimental_features + .contains(ExperimentalFeature::DisableIdempotencyTable) + { Self::do_store_idempotency_id( ctx, service_invocation @@ -564,6 +581,17 @@ impl StateMachine { } } } + InvocationStatus::Killed(metadata) => { + self.send_response_to_sinks( + ctx, + service_invocation.response_sink.take().into_iter(), + KILLED_INVOCATION_ERROR, + Some(invocation_id), + None, + Some(&metadata.invocation_target), + ) + .await?; + } InvocationStatus::Completed(completed) => { // SAFETY: We use this field to send back the notification to ingress, and not as part of the PP deterministic logic. let completion_expiry_time = unsafe { completed.completion_expiry_time() }; @@ -810,7 +838,7 @@ impl StateMachine { Ok(()) } - async fn try_terminate_invocation< + async fn on_terminate_invocation< State: VirtualObjectStatusTable + InvocationStatusTable + InboxTable @@ -828,12 +856,12 @@ impl StateMachine { }: InvocationTermination, ) -> Result<(), Error> { match termination_flavor { - TerminationFlavor::Kill => self.try_kill_invocation(ctx, invocation_id).await, - TerminationFlavor::Cancel => self.try_cancel_invocation(ctx, invocation_id).await, + TerminationFlavor::Kill => self.on_kill_invocation(ctx, invocation_id).await, + TerminationFlavor::Cancel => self.on_cancel_invocation(ctx, invocation_id).await, } } - async fn try_kill_invocation< + async fn on_kill_invocation< State: VirtualObjectStatusTable + InvocationStatusTable + InboxTable @@ -841,6 +869,7 @@ impl StateMachine { + StateTable + JournalTable + OutboxTable + + TimerTable + FsmTable, >( &mut self, @@ -850,8 +879,13 @@ impl StateMachine { let status = ctx.get_invocation_status(&invocation_id).await?; match status { - InvocationStatus::Invoked(metadata) | InvocationStatus::Suspended { metadata, .. } => { - self.kill_invocation(ctx, invocation_id, metadata).await?; + InvocationStatus::Invoked(metadata) => { + self.kill_invoked_invocation(ctx, invocation_id, metadata) + .await?; + } + InvocationStatus::Suspended { metadata, .. } => { + self.kill_suspended_invocation(ctx, invocation_id, metadata) + .await?; } InvocationStatus::Inboxed(inboxed) => { self.terminate_inboxed_invocation( @@ -862,7 +896,24 @@ impl StateMachine { ) .await? } - _ => { + InvocationStatus::Scheduled(scheduled) => { + self.terminate_scheduled_invocation( + ctx, + TerminationFlavor::Kill, + invocation_id, + scheduled, + ) + .await? + } + InvocationStatus::Killed(_) => { + trace!("Received kill command for an already killed invocation with id '{invocation_id}'."); + // Nothing to do here really, let's send again the abort signal to the invoker just in case + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true); + } + InvocationStatus::Completed(_) => { + debug!("Received kill command for completed invocation '{invocation_id}'. To cleanup the invocation after it's been completed, use the purge invocation command."); + } + InvocationStatus::Free => { trace!("Received kill command for unknown invocation with id '{invocation_id}'."); // We still try to send the abort signal to the invoker, // as it might be the case that previously the user sent an abort signal @@ -870,14 +921,14 @@ impl StateMachine { // This can happen because the invoke/resume and the abort invoker messages end up in different queues, // and the abort message can overtake the invoke/resume. // Consequently the invoker might have not received the abort and the user tried to send it again. - Self::do_send_abort_invocation_to_invoker(ctx, invocation_id); + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); } }; Ok(()) } - async fn try_cancel_invocation< + async fn on_cancel_invocation< State: VirtualObjectStatusTable + InvocationStatusTable + InboxTable @@ -928,7 +979,26 @@ impl StateMachine { ) .await? } - _ => { + InvocationStatus::Scheduled(scheduled) => { + self.terminate_scheduled_invocation( + ctx, + TerminationFlavor::Cancel, + invocation_id, + scheduled, + ) + .await? + } + InvocationStatus::Killed(_) => { + trace!( + "Received cancel command for an already killed invocation '{invocation_id}'." + ); + // Nothing to do here really, let's send again the abort signal to the invoker just in case + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true); + } + InvocationStatus::Completed(_) => { + debug!("Received cancel command for completed invocation '{invocation_id}'. To cleanup the invocation after it's been completed, use the purge invocation command."); + } + InvocationStatus::Free => { trace!("Received cancel command for unknown invocation with id '{invocation_id}'."); // We still try to send the abort signal to the invoker, // as it might be the case that previously the user sent an abort signal @@ -936,7 +1006,7 @@ impl StateMachine { // This can happen because the invoke/resume and the abort invoker messages end up in different queues, // and the abort message can overtake the invoke/resume. // Consequently the invoker might have not received the abort and the user tried to send it again. - Self::do_send_abort_invocation_to_invoker(ctx, invocation_id); + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); } }; @@ -1002,7 +1072,67 @@ impl StateMachine { Ok(()) } - async fn kill_invocation< + async fn terminate_scheduled_invocation< + State: InvocationStatusTable + TimerTable + OutboxTable + FsmTable, + >( + &mut self, + ctx: &mut StateMachineApplyContext<'_, State>, + termination_flavor: TerminationFlavor, + invocation_id: InvocationId, + scheduled_invocation: ScheduledInvocation, + ) -> Result<(), Error> { + let error = match termination_flavor { + TerminationFlavor::Kill => KILLED_INVOCATION_ERROR, + TerminationFlavor::Cancel => CANCELED_INVOCATION_ERROR, + }; + + let ScheduledInvocation { + metadata: + PreFlightInvocationMetadata { + response_sinks, + span_context, + invocation_target, + execution_time, + .. + }, + } = scheduled_invocation; + + // Reply back to callers with error, and publish end trace + self.send_response_to_sinks( + ctx, + response_sinks, + &error, + Some(invocation_id), + None, + Some(&invocation_target), + ) + .await?; + + // Delete timer + if let Some(execution_time) = execution_time { + Self::do_delete_timer( + ctx, + TimerKey::neo_invoke(execution_time.as_u64(), invocation_id.invocation_uuid()), + ) + .await?; + } else { + warn!("Scheduled invocations must always have an execution time."); + } + Self::do_free_invocation(ctx, invocation_id).await; + + self.notify_invocation_result( + ctx, + invocation_id, + invocation_target, + span_context, + MillisSinceEpoch::now(), + Err((error.code(), error.to_string())), + ); + + Ok(()) + } + + async fn kill_invoked_invocation< State: InboxTable + VirtualObjectStatusTable + InvocationStatusTable @@ -1020,9 +1150,61 @@ impl StateMachine { self.kill_child_invocations(ctx, &invocation_id, metadata.journal_metadata.length) .await?; - self.fail_invocation(ctx, invocation_id, metadata, KILLED_INVOCATION_ERROR) + if self + .experimental_features + .contains(ExperimentalFeature::InvocationStatusKilled) + { + debug_if_leader!( + ctx.is_leader, + restate.invocation.id = %invocation_id, + "Effect: Store killed invocation" + ); + + ctx.storage + .put_invocation_status(&invocation_id, &InvocationStatus::Killed(metadata)) + .await; + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true); + } else { + self.end_invocation( + ctx, + invocation_id, + metadata, + Some(ResponseResult::Failure(KILLED_INVOCATION_ERROR)), + ) + .await?; + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); + } + Ok(()) + } + + async fn kill_suspended_invocation< + State: InboxTable + + VirtualObjectStatusTable + + InvocationStatusTable + + VirtualObjectStatusTable + + StateTable + + JournalTable + + OutboxTable + + FsmTable, + >( + &mut self, + ctx: &mut StateMachineApplyContext<'_, State>, + invocation_id: InvocationId, + metadata: InFlightInvocationMetadata, + ) -> Result<(), Error> { + self.kill_child_invocations(ctx, &invocation_id, metadata.journal_metadata.length) .await?; - Self::do_send_abort_invocation_to_invoker(ctx, invocation_id); + + // No need to go through the Killed state when we're suspended, + // because it means we already got a terminal state from the invoker. + self.end_invocation( + ctx, + invocation_id, + metadata, + Some(ResponseResult::Failure(KILLED_INVOCATION_ERROR)), + ) + .await?; + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); Ok(()) } @@ -1179,7 +1361,7 @@ impl StateMachine { } } - async fn try_purge_invocation< + async fn on_purge_invocation< State: InvocationStatusTable + IdempotencyTable + VirtualObjectStatusTable @@ -1276,7 +1458,7 @@ impl StateMachine { self.on_service_invocation(ctx, service_invocation).await } Timer::CleanInvocationStatus(invocation_id) => { - self.try_purge_invocation(ctx, invocation_id).await + self.on_purge_invocation(ctx, invocation_id).await } Timer::NeoInvoke(invocation_id) => self.on_neo_invoke_timer(ctx, invocation_id).await, } @@ -1355,17 +1537,7 @@ impl StateMachine { let status = ctx .get_invocation_status(&invoker_effect.invocation_id) .await?; - - match status { - InvocationStatus::Invoked(invocation_metadata) => { - self.on_invoker_effect(ctx, invoker_effect, invocation_metadata) - .await? - } - _ => { - trace!("Received invoker effect for unknown service invocation. Ignoring the effect and aborting."); - Self::do_send_abort_invocation_to_invoker(ctx, invoker_effect.invocation_id); - } - }; + self.on_invoker_effect(ctx, invoker_effect, status).await?; self.latency.record(start.elapsed()); Ok(()) @@ -1388,8 +1560,29 @@ impl StateMachine { invocation_id, kind, }: InvokerEffect, - invocation_metadata: InFlightInvocationMetadata, + invocation_status: InvocationStatus, ) -> Result<(), Error> { + let is_status_invoked = matches!(invocation_status, InvocationStatus::Invoked(_)); + let is_status_killed = matches!(invocation_status, InvocationStatus::Killed(_)); + + if !is_status_invoked && !is_status_killed { + trace!("Received invoker effect for invocation not in invoked nor killed status. Ignoring the effect."); + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); + return Ok(()); + } + if is_status_killed + && !matches!(kind, InvokerEffectKind::Failed(_) | InvokerEffectKind::End) + { + warn!( + "Received non terminal invoker effect for killed invocation. Ignoring the effect." + ); + return Ok(()); + } + + let invocation_metadata = invocation_status + .into_invocation_metadata() + .expect("Must be present if status is killed or invoked"); + match kind { InvokerEffectKind::PinnedDeployment(pinned_deployment) => { Self::do_store_pinned_deployment( @@ -1447,12 +1640,27 @@ impl StateMachine { } } InvokerEffectKind::End => { - self.end_invocation(ctx, invocation_id, invocation_metadata) - .await?; + self.end_invocation( + ctx, + invocation_id, + invocation_metadata, + if is_status_killed { + // It doesn't matter that the invocation successfully completed, we return failed anyway in this case. + Some(ResponseResult::Failure(KILLED_INVOCATION_ERROR)) + } else { + None + }, + ) + .await?; } InvokerEffectKind::Failed(e) => { - self.fail_invocation(ctx, invocation_id, invocation_metadata, e) - .await?; + self.end_invocation( + ctx, + invocation_id, + invocation_metadata, + Some(ResponseResult::Failure(e)), + ) + .await?; } } @@ -1472,26 +1680,19 @@ impl StateMachine { ctx: &mut StateMachineApplyContext<'_, State>, invocation_id: InvocationId, invocation_metadata: InFlightInvocationMetadata, + // If given, this will override any Output Entry available in the journal table + response_result_override: Option, ) -> Result<(), Error> { + let invocation_target = invocation_metadata.invocation_target.clone(); let journal_length = invocation_metadata.journal_metadata.length; let completion_retention_time = invocation_metadata.completion_retention_duration; - self.notify_invocation_result( - ctx, - invocation_id, - invocation_metadata.invocation_target.clone(), - invocation_metadata.journal_metadata.span_context.clone(), - unsafe { invocation_metadata.timestamps.creation_time() }, - Ok(()), - ); - - // Pop from inbox - Self::consume_inbox(ctx, &invocation_metadata.invocation_target).await?; - // If there are any response sinks, or we need to store back the completed status, // we need to find the latest output entry if !invocation_metadata.response_sinks.is_empty() || !completion_retention_time.is_zero() { - let result = if let Some(output_entry) = self + let response_result = if let Some(response_result) = response_result_override { + response_result + } else if let Some(output_entry) = self .read_last_output_entry(ctx, &invocation_id, journal_length) .await? { @@ -1506,21 +1707,46 @@ impl StateMachine { self.send_response_to_sinks( ctx, invocation_metadata.response_sinks.clone(), - result.clone(), + response_result.clone(), Some(invocation_id), None, Some(&invocation_metadata.invocation_target), ) .await?; + // Notify invocation result + self.notify_invocation_result( + ctx, + invocation_id, + invocation_metadata.invocation_target.clone(), + invocation_metadata.journal_metadata.span_context.clone(), + // SAFETY: We use this field to send back the notification to ingress, and not as part of the PP deterministic logic. + unsafe { invocation_metadata.timestamps.creation_time() }, + match &response_result { + ResponseResult::Success(_) => Ok(()), + ResponseResult::Failure(err) => Err((err.code(), err.message().to_owned())), + }, + ); + // Store the completed status, if needed if !completion_retention_time.is_zero() { let completed_invocation = CompletedInvocation::from_in_flight_invocation_metadata( invocation_metadata, - result, + response_result, ); Self::do_store_completed_invocation(ctx, invocation_id, completed_invocation).await; } + } else { + // Just notify Ok, no need to read the output entry + self.notify_invocation_result( + ctx, + invocation_id, + invocation_target.clone(), + invocation_metadata.journal_metadata.span_context.clone(), + // SAFETY: We use this field to send back the notification to ingress, and not as part of the PP deterministic logic. + unsafe { invocation_metadata.timestamps.creation_time() }, + Ok(()), + ); } // If no retention, immediately cleanup the invocation status @@ -1529,64 +1755,8 @@ impl StateMachine { } Self::do_drop_journal(ctx, invocation_id, journal_length).await; - Ok(()) - } - - async fn fail_invocation< - State: InboxTable - + VirtualObjectStatusTable - + InvocationStatusTable - + VirtualObjectStatusTable - + StateTable - + JournalTable - + OutboxTable - + FsmTable, - >( - &mut self, - ctx: &mut StateMachineApplyContext<'_, State>, - invocation_id: InvocationId, - invocation_metadata: InFlightInvocationMetadata, - error: InvocationError, - ) -> Result<(), Error> { - let journal_length = invocation_metadata.journal_metadata.length; - - self.notify_invocation_result( - ctx, - invocation_id, - invocation_metadata.invocation_target.clone(), - invocation_metadata.journal_metadata.span_context.clone(), - unsafe { invocation_metadata.timestamps.creation_time() }, - Err((error.code(), error.to_string())), - ); - - let response_result = ResponseResult::from(error); - - // Send responses out - self.send_response_to_sinks( - ctx, - invocation_metadata.response_sinks.clone(), - response_result.clone(), - Some(invocation_id), - None, - Some(&invocation_metadata.invocation_target), - ) - .await?; - - // Pop from inbox - Self::consume_inbox(ctx, &invocation_metadata.invocation_target).await?; - - // Store the completed status or free it - if !invocation_metadata.completion_retention_duration.is_zero() { - let completed_invocation = CompletedInvocation::from_in_flight_invocation_metadata( - invocation_metadata, - response_result, - ); - Self::do_store_completed_invocation(ctx, invocation_id, completed_invocation).await; - } else { - Self::do_free_invocation(ctx, invocation_id).await; - } - - Self::do_drop_journal(ctx, invocation_id, journal_length).await; + // Consume inbox and move on + Self::consume_inbox(ctx, &invocation_target).await?; Ok(()) } @@ -2785,6 +2955,17 @@ impl StateMachine { .await?; } } + InvocationStatus::Killed(metadata) => { + self.send_response_to_sinks( + ctx, + vec![attach_invocation_request.response_sink], + KILLED_INVOCATION_ERROR, + Some(invocation_id), + None, + Some(&metadata.invocation_target), + ) + .await?; + } InvocationStatus::Completed(completed) => { // SAFETY: We use this field to send back the notification to ingress, and not as part of the PP deterministic logic. let completion_expiry_time = unsafe { completed.completion_expiry_time() }; @@ -3487,11 +3668,14 @@ impl StateMachine { fn do_send_abort_invocation_to_invoker( ctx: &mut StateMachineApplyContext<'_, State>, invocation_id: InvocationId, + acknowledge: bool, ) { debug_if_leader!(ctx.is_leader, restate.invocation.id = %invocation_id, "Effect: Send abort command to invoker"); - ctx.action_collector - .push(Action::AbortInvocation(invocation_id)); + ctx.action_collector.push(Action::AbortInvocation { + invocation_id, + acknowledge, + }); } async fn do_mutate_state( diff --git a/crates/worker/src/partition/state_machine/tests/idempotency.rs b/crates/worker/src/partition/state_machine/tests/idempotency.rs index 66fb2c8a6..07e947433 100644 --- a/crates/worker/src/partition/state_machine/tests/idempotency.rs +++ b/crates/worker/src/partition/state_machine/tests/idempotency.rs @@ -25,11 +25,13 @@ use rstest::*; use std::time::Duration; #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[restate_core::test] -async fn start_and_complete_idempotent_invocation(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn start_and_complete_idempotent_invocation( + #[case] experimental_features: EnumSet, +) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let retention = Duration::from_secs(60) * 60 * 24; @@ -59,7 +61,7 @@ async fn start_and_complete_idempotent_invocation(#[case] disable_idempotency_ta ); // Assert idempotency key mapping exists only with idempotency table writes enabled - if disable_idempotency_table { + if experimental_features.contains(ExperimentalFeature::DisableIdempotencyTable) { assert_that!( test_env .storage() @@ -131,13 +133,13 @@ async fn start_and_complete_idempotent_invocation(#[case] disable_idempotency_ta } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[restate_core::test] async fn start_and_complete_idempotent_invocation_neo_table( - #[case] disable_idempotency_table: bool, + #[case] experimental_features: EnumSet, ) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let retention = Duration::from_secs(60) * 60 * 24; @@ -167,7 +169,7 @@ async fn start_and_complete_idempotent_invocation_neo_table( ); // Assert idempotency key mapping exists only with idempotency table writes enabled - if disable_idempotency_table { + if experimental_features.contains(ExperimentalFeature::DisableIdempotencyTable) { assert_that!( test_env .storage() @@ -243,11 +245,13 @@ async fn start_and_complete_idempotent_invocation_neo_table( } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[restate_core::test] -async fn complete_already_completed_invocation(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn complete_already_completed_invocation( + #[case] experimental_features: EnumSet, +) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let invocation_target = InvocationTarget::mock_virtual_object(); @@ -302,13 +306,13 @@ async fn complete_already_completed_invocation(#[case] disable_idempotency_table } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[restate_core::test] async fn attach_with_service_invocation_command_while_executing( - #[case] disable_idempotency_table: bool, + #[case] experimental_features: EnumSet, ) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let retention = Duration::from_secs(60) * 60 * 24; @@ -399,16 +403,16 @@ async fn attach_with_service_invocation_command_while_executing( } #[rstest] -#[case(true, true)] -#[case(true, false)] -#[case(false, true)] -#[case(false, false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into(), true)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into(), false)] +#[case(EnumSet::empty(), true)] +#[case(EnumSet::empty(), false)] #[restate_core::test] async fn attach_with_send_service_invocation( - #[case] disable_idempotency_table: bool, + #[case] experimental_features: EnumSet, #[case] use_same_request_id: bool, ) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let retention = Duration::from_secs(60) * 60 * 24; @@ -524,11 +528,13 @@ async fn attach_with_send_service_invocation( } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[restate_core::test] -async fn attach_inboxed_with_send_service_invocation(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn attach_inboxed_with_send_service_invocation( + #[case] experimental_features: EnumSet, +) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let invocation_target = InvocationTarget::mock_virtual_object(); let request_id_1 = PartitionProcessorRpcRequestId::default(); @@ -620,11 +626,11 @@ async fn attach_inboxed_with_send_service_invocation(#[case] disable_idempotency } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[restate_core::test] -async fn attach_command(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn attach_command(#[case] experimental_features: EnumSet) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let completion_retention = Duration::from_secs(60) * 60 * 24; @@ -773,11 +779,13 @@ async fn attach_command_without_blocking_inflight() { } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[restate_core::test] -async fn purge_completed_idempotent_invocation(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn purge_completed_idempotent_invocation( + #[case] experimental_features: EnumSet, +) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let invocation_target = InvocationTarget::mock_virtual_object(); diff --git a/crates/worker/src/partition/state_machine/tests/kill_cancel.rs b/crates/worker/src/partition/state_machine/tests/kill_cancel.rs index 069d1ee39..81675c136 100644 --- a/crates/worker/src/partition/state_machine/tests/kill_cancel.rs +++ b/crates/worker/src/partition/state_machine/tests/kill_cancel.rs @@ -14,17 +14,24 @@ use assert2::assert; use assert2::let_assert; use googletest::any; use prost::Message; +use restate_storage_api::invocation_status_table::JournalMetadata; use restate_storage_api::journal_table::JournalTable; use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind, TimerTable}; use restate_types::identifiers::EntryIndex; use restate_types::invocation::TerminationFlavor; use restate_types::journal::enriched::EnrichedEntryHeader; use restate_types::service_protocol; +use rstest::rstest; use test_log::test; -#[test(restate_core::test)] -async fn kill_inboxed_invocation() -> anyhow::Result<()> { - let mut test_env = TestEnv::create().await; +#[rstest] +#[case(ExperimentalFeature::InvocationStatusKilled.into())] +#[case(EnumSet::empty())] +#[restate_core::test] +async fn kill_inboxed_invocation( + #[case] experimental_features: EnumSet, +) -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let invocation_target = InvocationTarget::mock_virtual_object(); let invocation_id = InvocationId::mock_generate(&invocation_target); @@ -76,41 +83,103 @@ async fn kill_inboxed_invocation() -> anyhow::Result<()> { // assert that invocation status was removed assert!(let InvocationStatus::Free = current_invocation_status); - fn outbox_message_matcher( - caller_id: InvocationId, - ) -> impl Matcher { - pat!( - restate_storage_api::outbox_table::OutboxMessage::ServiceResponse(pat!( - restate_types::invocation::InvocationResponse { - id: eq(caller_id), - entry_index: eq(0), - result: eq(ResponseResult::Failure(KILLED_INVOCATION_ERROR)) - } - )) - ) - } - assert_that!( actions, - contains(pat!(Action::NewOutboxMessage { - message: outbox_message_matcher(caller_id) - })) + contains( + matchers::actions::invocation_response_to_partition_processor( + caller_id, + 0, + eq(ResponseResult::Failure(KILLED_INVOCATION_ERROR)) + ) + ) ); let outbox_message = test_env.storage().get_next_outbox_message(0).await?; assert_that!( outbox_message, - some((ge(0), outbox_message_matcher(caller_id))) + some(( + ge(0), + matchers::outbox::invocation_response_to_partition_processor( + caller_id, + 0, + eq(ResponseResult::Failure(KILLED_INVOCATION_ERROR)) + ) + )) ); test_env.shutdown().await; Ok(()) } -#[test(restate_core::test)] -async fn kill_call_tree() -> anyhow::Result<()> { - let mut test_env = TestEnv::create().await; +#[rstest] +// No need to test Invocation status killed experimental feature with cancel, as it has no impact +#[case(ExperimentalFeature::InvocationStatusKilled.into(), TerminationFlavor::Kill)] +#[case(EnumSet::empty(), TerminationFlavor::Kill)] +#[case(EnumSet::empty(), TerminationFlavor::Cancel)] +#[restate_core::test] +async fn terminate_scheduled_invocation( + #[case] experimental_features: EnumSet, + #[case] termination_flavor: TerminationFlavor, +) -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; + + let invocation_id = InvocationId::mock_random(); + let rpc_id = PartitionProcessorRpcRequestId::new(); + + let _ = test_env + .apply(Command::Invoke(ServiceInvocation { + invocation_id, + execution_time: Some(MillisSinceEpoch::MAX), + response_sink: Some(ServiceInvocationResponseSink::ingress(rpc_id)), + ..ServiceInvocation::mock() + })) + .await; + + // assert that inboxed invocation is in invocation_status + let current_invocation_status = test_env + .storage() + .get_invocation_status(&invocation_id) + .await?; + assert!(let InvocationStatus::Scheduled(_) = current_invocation_status); + + let actions = test_env + .apply(Command::TerminateInvocation(InvocationTermination { + invocation_id, + flavor: termination_flavor, + })) + .await; + assert_that!( + actions, + contains(pat!(Action::IngressResponse { + request_id: eq(rpc_id), + invocation_id: some(eq(invocation_id)), + response: eq(IngressResponseResult::Failure(match termination_flavor { + TerminationFlavor::Kill => KILLED_INVOCATION_ERROR, + TerminationFlavor::Cancel => CANCELED_INVOCATION_ERROR, + })) + })) + ); + + // assert that invocation status was removed + let current_invocation_status = test_env + .storage() + .get_invocation_status(&invocation_id) + .await?; + assert!(let InvocationStatus::Free = current_invocation_status); + + test_env.shutdown().await; + Ok(()) +} + +#[rstest] +#[case(ExperimentalFeature::InvocationStatusKilled.into())] +#[case(EnumSet::empty())] +#[restate_core::test] +async fn kill_call_tree( + #[case] experimental_features: EnumSet, +) -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let call_invocation_id = InvocationId::mock_random(); let background_call_invocation_id = InvocationId::mock_random(); @@ -170,31 +239,23 @@ async fn kill_call_tree() -> anyhow::Result<()> { ))) .await; - // Invocation should be gone - assert_that!( - test_env - .storage - .get_invocation_status(&invocation_id) - .await?, - pat!(InvocationStatus::Free) - ); - assert_that!( - test_env - .storage - .get_journal(&invocation_id, 4) - .try_collect::>() - .await?, - empty() - ); + let abort_command_matcher = + if experimental_features.contains(ExperimentalFeature::InvocationStatusKilled) { + pat!(Action::AbortInvocation { + invocation_id: eq(invocation_id), + acknowledge: eq(true) + }) + } else { + pat!(Action::AbortInvocation { + invocation_id: eq(invocation_id), + acknowledge: eq(false) + }) + }; assert_that!( actions, all!( - contains(pat!(Action::AbortInvocation(eq(invocation_id)))), - contains(pat!(Action::Invoke { - invocation_id: eq(enqueued_invocation_id_on_same_target), - invocation_target: eq(invocation_target) - })), + contains(abort_command_matcher), contains(matchers::actions::terminate_invocation( call_invocation_id, TerminationFlavor::Kill @@ -213,6 +274,92 @@ async fn kill_call_tree() -> anyhow::Result<()> { }))) ) ); + if experimental_features.contains(ExperimentalFeature::InvocationStatusKilled) { + // We don't pop the inbox yet, but only after invocation ends + assert_that!( + actions, + not(contains(matchers::actions::invoke_for_id_and_target( + enqueued_invocation_id_on_same_target, + invocation_target.clone(), + ))) + ) + } else { + // Inbox should have been popped + assert_that!( + actions, + contains(matchers::actions::invoke_for_id_and_target( + enqueued_invocation_id_on_same_target, + invocation_target.clone(), + )) + ) + }; + + if experimental_features.contains(ExperimentalFeature::InvocationStatusKilled) { + // A couple of new expectations here: + // * the invocation status is now in killed state + assert_that!( + test_env + .storage + .get_invocation_status(&invocation_id) + .await?, + pat!(InvocationStatus::Killed { .. }) + ); + + // * No new journal entries will be accepted! + let _ = test_env + .apply(Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::JournalEntry { + entry_index: 4, + entry: ProtobufRawEntryCodec::serialize_enriched(Entry::ClearAllState), + }, + })) + .await; + // Journal entry was ignored (journal length == 4) + assert_that!( + test_env + .storage + .get_invocation_status(&invocation_id) + .await?, + pat!(InvocationStatus::Killed(pat!(InFlightInvocationMetadata { + journal_metadata: pat!(JournalMetadata { length: eq(4) }) + }))) + ); + + // Now send the Failed invoker effect + let actions = test_env + .apply(Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::Failed(KILLED_INVOCATION_ERROR), + })) + .await; + + // The inbox is popped after the invoker sends failed + assert_that!( + actions, + contains(matchers::actions::invoke_for_id_and_target( + enqueued_invocation_id_on_same_target, + invocation_target + )) + ); + } + + // Invocation should be finally gone + assert_that!( + test_env + .storage + .get_invocation_status(&invocation_id) + .await?, + pat!(InvocationStatus::Free) + ); + assert_that!( + test_env + .storage + .get_journal(&invocation_id, 4) + .try_collect::>() + .await?, + empty() + ); test_env.shutdown().await; Ok(()) diff --git a/crates/worker/src/partition/state_machine/tests/matchers.rs b/crates/worker/src/partition/state_machine/tests/matchers.rs index 2402b3e84..eee3267af 100644 --- a/crates/worker/src/partition/state_machine/tests/matchers.rs +++ b/crates/worker/src/partition/state_machine/tests/matchers.rs @@ -52,7 +52,7 @@ pub mod actions { use crate::partition::state_machine::Action; use restate_types::identifiers::InvocationId; - use restate_types::invocation::{InvocationResponse, ResponseResult}; + use restate_types::invocation::{InvocationTarget, ResponseResult}; pub fn invoke_for_id(invocation_id: InvocationId) -> impl Matcher { pat!(Action::Invoke { @@ -60,6 +60,16 @@ pub mod actions { }) } + pub fn invoke_for_id_and_target( + invocation_id: InvocationId, + invocation_target: InvocationTarget, + ) -> impl Matcher { + pat!(Action::Invoke { + invocation_id: eq(invocation_id), + invocation_target: eq(invocation_target) + }) + } + pub fn delete_sleep_timer(entry_index: EntryIndex) -> impl Matcher { pat!(Action::DeleteTimer { timer_key: pat!(TimerKey { @@ -109,19 +119,39 @@ pub mod actions { response_result_matcher: impl Matcher + 'static, ) -> impl Matcher { pat!(Action::NewOutboxMessage { - message: pat!( - restate_storage_api::outbox_table::OutboxMessage::ServiceResponse(pat!( - InvocationResponse { - id: eq(caller_invocation_id), - entry_index: eq(caller_entry_index), - result: response_result_matcher - } - )) + message: outbox::invocation_response_to_partition_processor( + caller_invocation_id, + caller_entry_index, + response_result_matcher ) }) } } +pub mod outbox { + use super::*; + + use restate_storage_api::outbox_table::OutboxMessage; + use restate_types::identifiers::InvocationId; + use restate_types::invocation::{InvocationResponse, ResponseResult}; + + pub fn invocation_response_to_partition_processor( + caller_invocation_id: InvocationId, + caller_entry_index: EntryIndex, + response_result_matcher: impl Matcher + 'static, + ) -> impl Matcher { + pat!( + restate_storage_api::outbox_table::OutboxMessage::ServiceResponse(pat!( + InvocationResponse { + id: eq(caller_invocation_id), + entry_index: eq(caller_entry_index), + result: response_result_matcher + } + )) + ) + } +} + pub fn completion( entry_index: EntryIndex, completion_result: CompletionResult, diff --git a/crates/worker/src/partition/state_machine/tests/mod.rs b/crates/worker/src/partition/state_machine/tests/mod.rs index 690839ef2..7b8e3df99 100644 --- a/crates/worker/src/partition/state_machine/tests/mod.rs +++ b/crates/worker/src/partition/state_machine/tests/mod.rs @@ -27,7 +27,6 @@ use ::tracing::info; use bytes::Bytes; use bytestring::ByteString; use futures::{StreamExt, TryStreamExt}; -use googletest::matcher::Matcher; use googletest::{all, assert_that, pat, property}; use restate_core::TaskCenter; use restate_invoker_api::{EffectKind, InvokeInputJournal}; @@ -82,16 +81,18 @@ impl TestEnv { } pub async fn create() -> Self { - Self::create_with_options(false).await + Self::create_with_experimental_features(Default::default()).await } - pub async fn create_with_options(disable_idempotency_table: bool) -> Self { + pub async fn create_with_experimental_features( + experimental_features: EnumSet, + ) -> Self { Self::create_with_state_machine(StateMachine::new( 0, /* inbox_seq_number */ 0, /* outbox_seq_number */ None, /* outbox_head_seq_number */ PartitionKey::MIN..=PartitionKey::MAX, - disable_idempotency_table, + experimental_features, )) .await } @@ -955,7 +956,7 @@ async fn truncate_outbox_with_gap() -> Result<(), Error> { outbox_tail_index, Some(outbox_head_index), PartitionKey::MIN..=PartitionKey::MAX, - false, + EnumSet::empty(), )) .await; diff --git a/crates/worker/src/partition/state_machine/tests/workflow.rs b/crates/worker/src/partition/state_machine/tests/workflow.rs index 5c5448f2e..2d620b421 100644 --- a/crates/worker/src/partition/state_machine/tests/workflow.rs +++ b/crates/worker/src/partition/state_machine/tests/workflow.rs @@ -20,11 +20,11 @@ use rstest::*; use std::time::Duration; #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[restate_core::test] -async fn start_workflow_method(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn start_workflow_method(#[case] experimental_features: EnumSet) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let invocation_target = InvocationTarget::mock_workflow(); let invocation_id = InvocationId::mock_generate(&invocation_target); @@ -52,7 +52,7 @@ async fn start_workflow_method(#[case] disable_idempotency_table: bool) { ); // Assert service is locked only if we enable the idempotency table - if disable_idempotency_table { + if experimental_features.contains(ExperimentalFeature::DisableIdempotencyTable) { assert_that!( test_env .storage() @@ -184,11 +184,11 @@ async fn start_workflow_method(#[case] disable_idempotency_table: bool) { } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[restate_core::test] -async fn attach_by_workflow_key(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn attach_by_workflow_key(#[case] experimental_features: EnumSet) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let invocation_target = InvocationTarget::mock_workflow(); let invocation_id = InvocationId::mock_generate(&invocation_target); @@ -322,11 +322,11 @@ async fn attach_by_workflow_key(#[case] disable_idempotency_table: bool) { } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[restate_core::test] -async fn purge_completed_workflow(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn purge_completed_workflow(#[case] experimental_features: EnumSet) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let invocation_target = InvocationTarget::mock_workflow(); let invocation_id = InvocationId::mock_random();