diff --git a/crates/worker/src/partition/state_machine/tests/kill_cancel.rs b/crates/worker/src/partition/state_machine/tests/kill_cancel.rs index 81675c136f..a2c48764fa 100644 --- a/crates/worker/src/partition/state_machine/tests/kill_cancel.rs +++ b/crates/worker/src/partition/state_machine/tests/kill_cancel.rs @@ -10,6 +10,7 @@ use super::{fixtures, matchers, *}; +use crate::partition::state_machine::tests::matchers::{invoked, killed, suspended}; use assert2::assert; use assert2::let_assert; use googletest::any; @@ -157,6 +158,7 @@ async fn terminate_scheduled_invocation( response: eq(IngressResponseResult::Failure(match termination_flavor { TerminationFlavor::Kill => KILLED_INVOCATION_ERROR, TerminationFlavor::Cancel => CANCELED_INVOCATION_ERROR, + _ => panic!("Unexpected termination flavor"), })) })) ); @@ -172,6 +174,288 @@ async fn terminate_scheduled_invocation( Ok(()) } +#[rstest] +#[case(ExperimentalFeature::InvocationStatusKilled.into(), TerminationFlavor::KillAndRestart)] +#[case(EnumSet::empty(), TerminationFlavor::KillAndRestart)] +#[case(EnumSet::empty(), TerminationFlavor::CancelAndRestart)] +#[tokio::test] +async fn terminate_and_restart_scheduled_invocation_has_no_effect( + #[case] experimental_features: EnumSet<ExperimentalFeature>, + #[case] termination_flavor: TerminationFlavor, +) -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; + + let invocation_id = InvocationId::mock_random(); + + let _ = test_env + .apply(Command::Invoke(ServiceInvocation { + invocation_id, + execution_time: Some(MillisSinceEpoch::MAX), + ..ServiceInvocation::mock() + })) + .await; + + // assert that inboxed invocation is in invocation_status + let scheduled_invocation_status = test_env + .storage() + .get_invocation_status(&invocation_id) + .await?; + assert!(let InvocationStatus::Scheduled(_) = scheduled_invocation_status); + + // This has no effect + let actions = test_env + .apply(Command::TerminateInvocation(InvocationTermination { + invocation_id, + flavor: termination_flavor, + })) + .await; + assert_that!(actions, empty()); + + // Invocation status didn't change at all + assert_eq!( + scheduled_invocation_status, + test_env + .storage() + .get_invocation_status(&invocation_id) + .await + .unwrap() + ); + + test_env.shutdown().await; + Ok(()) +} + +#[rstest] +#[case(ExperimentalFeature::InvocationStatusKilled.into(), TerminationFlavor::KillAndRestart)] +#[case(EnumSet::empty(), TerminationFlavor::KillAndRestart)] +#[case(EnumSet::empty(), TerminationFlavor::CancelAndRestart)] +#[tokio::test] +async fn terminate_and_restart_inboxed_invocation_has_no_effect( + #[case] experimental_features: EnumSet<ExperimentalFeature>, + #[case] termination_flavor: TerminationFlavor, +) -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; + + let invocation_target = InvocationTarget::mock_virtual_object(); + let invocation_id = InvocationId::mock_generate(&invocation_target); + + // First invocation takes the lock + let _ = test_env + .apply(Command::Invoke(ServiceInvocation { + invocation_id: InvocationId::mock_generate(&invocation_target), + invocation_target: invocation_target.clone(), + ..ServiceInvocation::mock() + })) + .await; + + // This invocation will be inboxed + let _ = test_env + .apply(Command::Invoke(ServiceInvocation { + invocation_id, + invocation_target: invocation_target.clone(), + ..ServiceInvocation::mock() + })) + .await; + + // assert that inboxed invocation is in invocation_status + let inboxed_invocation_status = test_env + .storage() + .get_invocation_status(&invocation_id) + .await?; + assert!(let InvocationStatus::Inboxed(_) = inboxed_invocation_status); + + // This has no effect + let actions = test_env + .apply(Command::TerminateInvocation(InvocationTermination { + invocation_id, + flavor: termination_flavor, + })) + .await; + assert_that!(actions, empty()); + + // Invocation status didn't change at all + assert_eq!( + inboxed_invocation_status, + test_env + .storage() + .get_invocation_status(&invocation_id) + .await + .unwrap() + ); + + test_env.shutdown().await; + Ok(()) +} + +#[tokio::test] +async fn kill_and_restart_when_invoked() -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_experimental_features( + ExperimentalFeature::InvocationStatusKilled.into(), + ) + .await; + + let invocation_id = InvocationId::mock_random(); + + // Start invocation and pin the deployment and add one entry (doesn't matter which one) + let _ = test_env + .apply_multiple([ + Command::Invoke(ServiceInvocation { + invocation_id, + ..ServiceInvocation::mock() + }), + Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::PinnedDeployment(PinnedDeployment { + deployment_id: Default::default(), + service_protocol_version: Default::default(), + }), + }), + Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::JournalEntry { + entry_index: 1, + entry: ProtobufRawEntryCodec::serialize_enriched(Entry::ClearAllState), + }, + }), + ]) + .await; + assert_that!( + test_env + .storage() + .get_invocation_status(&invocation_id) + .await?, + invoked() + ); + + // First we should transition to killed status + let _ = test_env + .apply(Command::TerminateInvocation(InvocationTermination { + invocation_id, + flavor: TerminationFlavor::KillAndRestart, + })) + .await; + assert_that!( + test_env + .storage() + .get_invocation_status(&invocation_id) + .await?, + killed() + ); + + // Now send the Failed invoker effect to complete the kill procedure + let actions = test_env + .apply(Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::Failed(KILLED_INVOCATION_ERROR), + })) + .await; + + // Should have restarted the invocation + assert_that!( + actions, + contains(matchers::actions::invoke_for_id(invocation_id)) + ); + + // We should be back to invoked state with a reset journal and reset deployment id + assert_that!( + test_env + .storage() + .get_invocation_status(&invocation_id) + .await?, + pat!(InvocationStatus::Invoked(pat!( + InFlightInvocationMetadata { + journal_metadata: pat!(JournalMetadata { length: eq(1) }), + pinned_deployment: none(), + restart_when_completed: eq(false) + } + ))) + ); + + test_env.shutdown().await; + Ok(()) +} + +#[tokio::test] +async fn kill_and_restart_when_suspended() -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_experimental_features( + ExperimentalFeature::InvocationStatusKilled.into(), + ) + .await; + + let invocation_id = InvocationId::mock_random(); + + // Start invocation, pin the deployment, add one completable entry, suspend + let _ = test_env + .apply_multiple([ + Command::Invoke(ServiceInvocation { + invocation_id, + ..ServiceInvocation::mock() + }), + Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::PinnedDeployment(PinnedDeployment { + deployment_id: Default::default(), + service_protocol_version: Default::default(), + }), + }), + Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::JournalEntry { + entry_index: 1, + entry: ProtobufRawEntryCodec::serialize_enriched(Entry::Awakeable( + AwakeableEntry { result: None }, + )), + }, + }), + Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::Suspended { + waiting_for_completed_entries: [1].into(), + }, + }), + ]) + .await; + assert_that!( + test_env + .storage() + .get_invocation_status(&invocation_id) + .await?, + suspended() + ); + + // This should immediately restart + let actions = test_env + .apply(Command::TerminateInvocation(InvocationTermination { + invocation_id, + flavor: TerminationFlavor::KillAndRestart, + })) + .await; + + // Should have restarted the invocation + assert_that!( + actions, + contains(matchers::actions::invoke_for_id(invocation_id)) + ); + + // We should be back to invoked state with a reset journal and reset deployment id + assert_that!( + test_env + .storage() + .get_invocation_status(&invocation_id) + .await?, + pat!(InvocationStatus::Invoked(pat!( + InFlightInvocationMetadata { + journal_metadata: pat!(JournalMetadata { length: eq(1) }), + pinned_deployment: none(), + restart_when_completed: eq(false) + } + ))) + ); + + test_env.shutdown().await; + Ok(()) +} + #[rstest] #[case(ExperimentalFeature::InvocationStatusKilled.into())] #[case(EnumSet::empty())] diff --git a/crates/worker/src/partition/state_machine/tests/matchers.rs b/crates/worker/src/partition/state_machine/tests/matchers.rs index eee3267af1..9d84d3eef4 100644 --- a/crates/worker/src/partition/state_machine/tests/matchers.rs +++ b/crates/worker/src/partition/state_machine/tests/matchers.rs @@ -11,6 +11,7 @@ use bytes::Bytes; use bytestring::ByteString; use googletest::prelude::*; +use restate_storage_api::invocation_status_table::InvocationStatus; use restate_storage_api::timer_table::{TimerKey, TimerKeyKind}; use restate_types::errors::codes; use restate_types::identifiers::EntryIndex; @@ -152,6 +153,18 @@ pub mod outbox { } } +pub fn invoked() -> impl Matcher<ActualT = InvocationStatus> { + pat!(InvocationStatus::Invoked { .. }) +} + +pub fn suspended() -> impl Matcher<ActualT = InvocationStatus> { + pat!(InvocationStatus::Suspended { .. }) +} + +pub fn killed() -> impl Matcher<ActualT = InvocationStatus> { + pat!(InvocationStatus::Killed { .. }) +} + pub fn completion( entry_index: EntryIndex, completion_result: CompletionResult,