Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Nov 22, 2024
1 parent a6a0be1 commit 53e2e64
Show file tree
Hide file tree
Showing 2 changed files with 297 additions and 0 deletions.
284 changes: 284 additions & 0 deletions crates/worker/src/partition/state_machine/tests/kill_cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

use super::{fixtures, matchers, *};

use crate::partition::state_machine::tests::matchers::{invoked, killed, suspended};
use assert2::assert;
use assert2::let_assert;
use googletest::any;
Expand Down Expand Up @@ -156,6 +157,7 @@ async fn terminate_scheduled_invocation(
response: eq(IngressResponseResult::Failure(match termination_flavor {
TerminationFlavor::Kill => KILLED_INVOCATION_ERROR,
TerminationFlavor::Cancel => CANCELED_INVOCATION_ERROR,
_ => panic!("Unexpected termination flavor"),
}))
}))
);
Expand All @@ -171,6 +173,288 @@ async fn terminate_scheduled_invocation(
Ok(())
}

#[rstest]
#[case(ExperimentalFeature::InvocationStatusKilled.into(), TerminationFlavor::KillAndRestart)]
#[case(EnumSet::empty(), TerminationFlavor::KillAndRestart)]
#[case(EnumSet::empty(), TerminationFlavor::CancelAndRestart)]
#[tokio::test]
async fn terminate_and_restart_scheduled_invocation_has_no_effect(
#[case] experimental_features: EnumSet<ExperimentalFeature>,
#[case] termination_flavor: TerminationFlavor,
) -> anyhow::Result<()> {
let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await;

let invocation_id = InvocationId::mock_random();

let _ = test_env
.apply(Command::Invoke(ServiceInvocation {
invocation_id,
execution_time: Some(MillisSinceEpoch::MAX),
..ServiceInvocation::mock()
}))
.await;

// assert that inboxed invocation is in invocation_status
let scheduled_invocation_status = test_env
.storage()
.get_invocation_status(&invocation_id)
.await?;
assert!(let InvocationStatus::Scheduled(_) = scheduled_invocation_status);

// This has no effect
let actions = test_env
.apply(Command::TerminateInvocation(InvocationTermination {
invocation_id,
flavor: termination_flavor,
}))
.await;
assert_that!(actions, empty());

// Invocation status didn't change at all
assert_eq!(
scheduled_invocation_status,
test_env
.storage()
.get_invocation_status(&invocation_id)
.await
.unwrap()
);

test_env.shutdown().await;
Ok(())
}

#[rstest]
#[case(ExperimentalFeature::InvocationStatusKilled.into(), TerminationFlavor::KillAndRestart)]
#[case(EnumSet::empty(), TerminationFlavor::KillAndRestart)]
#[case(EnumSet::empty(), TerminationFlavor::CancelAndRestart)]
#[tokio::test]
async fn terminate_and_restart_inboxed_invocation_has_no_effect(
#[case] experimental_features: EnumSet<ExperimentalFeature>,
#[case] termination_flavor: TerminationFlavor,
) -> anyhow::Result<()> {
let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await;

let invocation_target = InvocationTarget::mock_virtual_object();
let invocation_id = InvocationId::mock_generate(&invocation_target);

// First invocation takes the lock
let _ = test_env
.apply(Command::Invoke(ServiceInvocation {
invocation_id: InvocationId::mock_generate(&invocation_target),
invocation_target: invocation_target.clone(),
..ServiceInvocation::mock()
}))
.await;

// This invocation will be inboxed
let _ = test_env
.apply(Command::Invoke(ServiceInvocation {
invocation_id,
invocation_target: invocation_target.clone(),
..ServiceInvocation::mock()
}))
.await;

// assert that inboxed invocation is in invocation_status
let inboxed_invocation_status = test_env
.storage()
.get_invocation_status(&invocation_id)
.await?;
assert!(let InvocationStatus::Inboxed(_) = inboxed_invocation_status);

// This has no effect
let actions = test_env
.apply(Command::TerminateInvocation(InvocationTermination {
invocation_id,
flavor: termination_flavor,
}))
.await;
assert_that!(actions, empty());

// Invocation status didn't change at all
assert_eq!(
inboxed_invocation_status,
test_env
.storage()
.get_invocation_status(&invocation_id)
.await
.unwrap()
);

test_env.shutdown().await;
Ok(())
}

#[tokio::test]
async fn kill_and_restart_when_invoked() -> anyhow::Result<()> {
let mut test_env = TestEnv::create_with_experimental_features(
ExperimentalFeature::InvocationStatusKilled.into(),
)
.await;

let invocation_id = InvocationId::mock_random();

// Start invocation and pin the deployment and add one entry (doesn't matter which one)
let _ = test_env
.apply_multiple([
Command::Invoke(ServiceInvocation {
invocation_id,
..ServiceInvocation::mock()
}),
Command::InvokerEffect(InvokerEffect {
invocation_id,
kind: InvokerEffectKind::PinnedDeployment(PinnedDeployment {
deployment_id: Default::default(),
service_protocol_version: Default::default(),
}),
}),
Command::InvokerEffect(InvokerEffect {
invocation_id,
kind: InvokerEffectKind::JournalEntry {
entry_index: 1,
entry: ProtobufRawEntryCodec::serialize_enriched(Entry::ClearAllState),
},
}),
])
.await;
assert_that!(
test_env
.storage()
.get_invocation_status(&invocation_id)
.await?,
invoked()
);

// First we should transition to killed status
let _ = test_env
.apply(Command::TerminateInvocation(InvocationTermination {
invocation_id,
flavor: TerminationFlavor::KillAndRestart,
}))
.await;
assert_that!(
test_env
.storage()
.get_invocation_status(&invocation_id)
.await?,
killed()
);

// Now send the Failed invoker effect to complete the kill procedure
let actions = test_env
.apply(Command::InvokerEffect(InvokerEffect {
invocation_id,
kind: InvokerEffectKind::Failed(KILLED_INVOCATION_ERROR),
}))
.await;

// Should have restarted the invocation
assert_that!(
actions,
contains(matchers::actions::invoke_for_id(invocation_id))
);

// We should be back to invoked state with a reset journal and reset deployment id
assert_that!(
test_env
.storage()
.get_invocation_status(&invocation_id)
.await?,
pat!(InvocationStatus::Invoked(pat!(
InFlightInvocationMetadata {
journal_metadata: pat!(JournalMetadata { length: eq(1) }),
pinned_deployment: none(),
restart_when_completed: eq(false)
}
)))
);

test_env.shutdown().await;
Ok(())
}

#[tokio::test]
async fn kill_and_restart_when_suspended() -> anyhow::Result<()> {
let mut test_env = TestEnv::create_with_experimental_features(
ExperimentalFeature::InvocationStatusKilled.into(),
)
.await;

let invocation_id = InvocationId::mock_random();

// Start invocation, pin the deployment, add one completable entry, suspend
let _ = test_env
.apply_multiple([
Command::Invoke(ServiceInvocation {
invocation_id,
..ServiceInvocation::mock()
}),
Command::InvokerEffect(InvokerEffect {
invocation_id,
kind: InvokerEffectKind::PinnedDeployment(PinnedDeployment {
deployment_id: Default::default(),
service_protocol_version: Default::default(),
}),
}),
Command::InvokerEffect(InvokerEffect {
invocation_id,
kind: InvokerEffectKind::JournalEntry {
entry_index: 1,
entry: ProtobufRawEntryCodec::serialize_enriched(Entry::Awakeable(
AwakeableEntry { result: None },
)),
},
}),
Command::InvokerEffect(InvokerEffect {
invocation_id,
kind: InvokerEffectKind::Suspended {
waiting_for_completed_entries: [1].into(),
},
}),
])
.await;
assert_that!(
test_env
.storage()
.get_invocation_status(&invocation_id)
.await?,
suspended()
);

// This should immediately restart
let actions = test_env
.apply(Command::TerminateInvocation(InvocationTermination {
invocation_id,
flavor: TerminationFlavor::KillAndRestart,
}))
.await;

// Should have restarted the invocation
assert_that!(
actions,
contains(matchers::actions::invoke_for_id(invocation_id))
);

// We should be back to invoked state with a reset journal and reset deployment id
assert_that!(
test_env
.storage()
.get_invocation_status(&invocation_id)
.await?,
pat!(InvocationStatus::Invoked(pat!(
InFlightInvocationMetadata {
journal_metadata: pat!(JournalMetadata { length: eq(1) }),
pinned_deployment: none(),
restart_when_completed: eq(false)
}
)))
);

test_env.shutdown().await;
Ok(())
}

#[rstest]
#[case(ExperimentalFeature::InvocationStatusKilled.into())]
#[case(EnumSet::empty())]
Expand Down
13 changes: 13 additions & 0 deletions crates/worker/src/partition/state_machine/tests/matchers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use bytes::Bytes;
use bytestring::ByteString;
use googletest::prelude::*;
use restate_storage_api::invocation_status_table::InvocationStatus;
use restate_storage_api::timer_table::{TimerKey, TimerKeyKind};
use restate_types::errors::codes;
use restate_types::identifiers::EntryIndex;
Expand Down Expand Up @@ -152,6 +153,18 @@ pub mod outbox {
}
}

pub fn invoked() -> impl Matcher<ActualT = InvocationStatus> {
pat!(InvocationStatus::Invoked { .. })
}

pub fn suspended() -> impl Matcher<ActualT = InvocationStatus> {
pat!(InvocationStatus::Suspended { .. })
}

pub fn killed() -> impl Matcher<ActualT = InvocationStatus> {
pat!(InvocationStatus::Killed { .. })
}

pub fn completion(
entry_index: EntryIndex,
completion_result: CompletionResult,
Expand Down

0 comments on commit 53e2e64

Please sign in to comment.