Skip to content

Commit

Permalink
Implement kill and retry/cancel and retry functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Nov 22, 2024
1 parent de5e0d1 commit 0edc593
Showing 1 changed file with 235 additions and 11 deletions.
246 changes: 235 additions & 11 deletions crates/worker/src/partition/state_machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;
use std::ops::RangeInclusive;
use std::time::Instant;
use tracing::error;
use tracing::{error, info};
use utils::SpanExt;

#[derive(Debug, Hash, enumset::EnumSetType, strum::Display)]
Expand Down Expand Up @@ -852,12 +852,20 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
ctx: &mut StateMachineApplyContext<'_, State>,
InvocationTermination {
invocation_id,
flavor: termination_flavor,
flavor,
}: InvocationTermination,
) -> Result<(), Error> {
match termination_flavor {
match flavor {
TerminationFlavor::Kill => self.on_kill_invocation(ctx, invocation_id).await,
TerminationFlavor::Cancel => self.on_cancel_invocation(ctx, invocation_id).await,
TerminationFlavor::KillAndRestart => {
self.on_kill_and_restart_invocation(ctx, invocation_id)
.await
}
TerminationFlavor::CancelAndRestart => {
self.on_cancel_and_restart_invocation(ctx, invocation_id)
.await
}
}
}

Expand All @@ -880,11 +888,11 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {

match status {
InvocationStatus::Invoked(metadata) => {
self.kill_invoked_invocation(ctx, invocation_id, metadata)
self.kill_invoked_invocation(ctx, invocation_id, metadata, false)
.await?;
}
InvocationStatus::Suspended { metadata, .. } => {
self.kill_suspended_invocation(ctx, invocation_id, metadata)
self.kill_suspended_invocation(ctx, invocation_id, metadata, false)
.await?;
}
InvocationStatus::Inboxed(inboxed) => {
Expand Down Expand Up @@ -1010,6 +1018,157 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
Ok(())
}

async fn on_kill_and_restart_invocation<
State: VirtualObjectStatusTable
+ InvocationStatusTable
+ InboxTable
+ FsmTable
+ StateTable
+ JournalTable
+ OutboxTable
+ TimerTable
+ FsmTable,
>(
&mut self,
ctx: &mut StateMachineApplyContext<'_, State>,
invocation_id: InvocationId,
) -> Result<(), Error> {
let status = ctx.get_invocation_status(&invocation_id).await?;

match status {
InvocationStatus::Invoked(metadata) => {
self.kill_invoked_invocation(ctx, invocation_id, metadata, true)
.await?;
}
InvocationStatus::Suspended { metadata, .. } => {
self.kill_suspended_invocation(ctx, invocation_id, metadata, true)
.await?;
}
InvocationStatus::Inboxed(_) => {
info!("Received kill and restart command for invocation '{invocation_id}' in inboxed state. Ignoring it as it was not yet executed.");
}
InvocationStatus::Scheduled(_) => {
info!("Received kill and restart command for invocation '{invocation_id}' in scheduled state. Ignoring it as it was not yet executed.");
}
InvocationStatus::Killed(mut metadata) => {
if !metadata.restart_when_completed {
// Update the restart_when_completed.
// This might happen if the user sent in a quick succession two kill commands, the first without retry and the second with retry
metadata.restart_when_completed = true;
ctx.storage
.put_invocation_status(&invocation_id, &InvocationStatus::Killed(metadata))
.await;
}

// Nothing to do here really, let's send again the abort signal to the invoker just in case
Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true);
}
InvocationStatus::Completed(_) => {
info!("Received kill and restart command for completed invocation '{invocation_id}'. To cleanup the invocation after it's been completed, use the purge invocation command.");
}
InvocationStatus::Free => {
trace!("Received kill and restart command for unknown invocation with id '{invocation_id}'.");
// We still try to send the abort signal to the invoker,
// as it might be the case that previously the user sent an abort signal
// but some message was still between the invoker/PP queues.
// This can happen because the invoke/resume and the abort invoker messages end up in different queues,
// and the abort message can overtake the invoke/resume.
// Consequently the invoker might have not received the abort and the user tried to send it again.
Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false);
}
};

Ok(())
}

async fn on_cancel_and_restart_invocation<
State: VirtualObjectStatusTable
+ InvocationStatusTable
+ InboxTable
+ FsmTable
+ StateTable
+ JournalTable
+ OutboxTable
+ TimerTable,
>(
&mut self,
ctx: &mut StateMachineApplyContext<'_, State>,
invocation_id: InvocationId,
) -> Result<(), Error> {
let status = ctx.get_invocation_status(&invocation_id).await?;

match status {
InvocationStatus::Invoked(mut metadata) => {
self.cancel_journal_leaves(
ctx,
invocation_id,
InvocationStatusProjection::Invoked,
metadata.journal_metadata.length,
)
.await?;
if !metadata.restart_when_completed {
// Update the restart_when_completed.
metadata.restart_when_completed = true;
ctx.storage
.put_invocation_status(&invocation_id, &InvocationStatus::Invoked(metadata))
.await;
}
}
InvocationStatus::Suspended {
mut metadata,
waiting_for_completed_entries,
} => {
if self
.cancel_journal_leaves(
ctx,
invocation_id,
InvocationStatusProjection::Suspended(waiting_for_completed_entries),
metadata.journal_metadata.length,
)
.await?
{
metadata.restart_when_completed = true;
Self::do_resume_service(ctx, invocation_id, metadata).await?;
}
}
InvocationStatus::Inboxed(_) => {
info!("Received cancel and restart command for invocation '{invocation_id}' in inboxed state. Ignoring it as it was not yet executed.");
}
InvocationStatus::Scheduled(_) => {
info!("Received cancel and restart command for invocation '{invocation_id}' in scheduled state. Ignoring it as it was not yet executed.");
}
InvocationStatus::Killed(mut metadata) => {
if !metadata.restart_when_completed {
// Update the restart_when_completed.
// This might happen if the user sent in a quick succession two kill commands, the first without retry and the second with retry
metadata.restart_when_completed = true;
ctx.storage
.put_invocation_status(&invocation_id, &InvocationStatus::Killed(metadata))
.await;
}

// Nothing to do here really, let's send again the abort signal to the invoker just in case
Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true);
}
InvocationStatus::Completed(_) => {
debug!("Received cancel command for completed invocation '{invocation_id}'. To cleanup the invocation after it's been completed, use the purge invocation command.");
}
InvocationStatus::Free => {
trace!("Received cancel command for unknown invocation with id '{invocation_id}'.");
// We still try to send the abort signal to the invoker,
// as it might be the case that previously the user sent an abort signal
// but some message was still between the invoker/PP queues.
// This can happen because the invoke/resume and the abort invoker messages end up in different queues,
// and the abort message can overtake the invoke/resume.
// Consequently the invoker might have not received the abort and the user tried to send it again.
// TODO
Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false);
}
};

Ok(())
}

async fn terminate_inboxed_invocation<
State: InvocationStatusTable + InboxTable + OutboxTable + FsmTable,
>(
Expand All @@ -1020,8 +1179,10 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
inboxed_invocation: InboxedInvocation,
) -> Result<(), Error> {
let error = match termination_flavor {
TerminationFlavor::Kill => KILLED_INVOCATION_ERROR,
TerminationFlavor::Cancel => CANCELED_INVOCATION_ERROR,
TerminationFlavor::Kill | TerminationFlavor::KillAndRestart => KILLED_INVOCATION_ERROR,
TerminationFlavor::Cancel | TerminationFlavor::CancelAndRestart => {
CANCELED_INVOCATION_ERROR
}
};

let InboxedInvocation {
Expand Down Expand Up @@ -1079,8 +1240,10 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
scheduled_invocation: ScheduledInvocation,
) -> Result<(), Error> {
let error = match termination_flavor {
TerminationFlavor::Kill => KILLED_INVOCATION_ERROR,
TerminationFlavor::Cancel => CANCELED_INVOCATION_ERROR,
TerminationFlavor::Kill | TerminationFlavor::KillAndRestart => KILLED_INVOCATION_ERROR,
TerminationFlavor::Cancel | TerminationFlavor::CancelAndRestart => {
CANCELED_INVOCATION_ERROR
}
};

let ScheduledInvocation {
Expand Down Expand Up @@ -1142,7 +1305,8 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
&mut self,
ctx: &mut StateMachineApplyContext<'_, State>,
invocation_id: InvocationId,
metadata: InFlightInvocationMetadata,
mut metadata: InFlightInvocationMetadata,
restart: bool,
) -> Result<(), Error> {
self.kill_child_invocations(ctx, &invocation_id, metadata.journal_metadata.length)
.await?;
Expand All @@ -1157,11 +1321,17 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
"Effect: Store killed invocation"
);

metadata.restart_when_completed = restart;
ctx.storage
.put_invocation_status(&invocation_id, &InvocationStatus::Killed(metadata))
.await;
Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true);
} else {
if restart {
// Kill and restart won't work without ExperimentalFeature::InvocationStatusKilled
warn!("Ignoring the kill and restart command for '{invocation_id}' and simply executing kill, as this command is not implemented yet")
}

self.end_invocation(
ctx,
invocation_id,
Expand All @@ -1187,11 +1357,16 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
&mut self,
ctx: &mut StateMachineApplyContext<'_, State>,
invocation_id: InvocationId,
metadata: InFlightInvocationMetadata,
mut metadata: InFlightInvocationMetadata,
restart: bool,
) -> Result<(), Error> {
self.kill_child_invocations(ctx, &invocation_id, metadata.journal_metadata.length)
.await?;

if restart {
metadata.restart_when_completed = true
}

// No need to go through the Killed state when we're suspended,
// because it means we already got a terminal state from the invoker.
self.end_invocation(
Expand Down Expand Up @@ -1680,6 +1855,12 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
// If given, this will override any Output Entry available in the journal table
response_result_override: Option<ResponseResult>,
) -> Result<(), Error> {
if invocation_metadata.restart_when_completed {
return self
.restart_invocation(ctx, invocation_id, invocation_metadata)
.await;
}

let invocation_target = invocation_metadata.invocation_target.clone();
let journal_length = invocation_metadata.journal_metadata.length;
let completion_retention_time = invocation_metadata.completion_retention_duration;
Expand Down Expand Up @@ -1756,6 +1937,49 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
Ok(())
}

async fn restart_invocation<
State: InboxTable
+ VirtualObjectStatusTable
+ JournalTable
+ OutboxTable
+ FsmTable
+ InvocationStatusTable
+ StateTable,
>(
&mut self,
ctx: &mut StateMachineApplyContext<'_, State>,
invocation_id: InvocationId,
mut invocation_metadata: InFlightInvocationMetadata,
) -> Result<(), Error> {
info!("Restarting invocation");

// We need to cleanup the journal except the first item.
let journal_length = invocation_metadata.journal_metadata.length;
debug_if_leader!(
ctx.is_leader,
restate.journal.length = journal_length,
"Effect: Drop journal except first entry"
);
ctx.storage
.delete_journal_range(&invocation_id, 1..journal_length)
.await;

// Let's reset a bunch of parameters in the InFlightInvocationMetadata
invocation_metadata.journal_metadata.length = 1;
invocation_metadata.pinned_deployment = None;
invocation_metadata.restart_when_completed = false;

Self::invoke(
ctx,
invocation_id,
invocation_metadata,
InvokeInputJournal::NoCachedJournal,
)
.await?;

Ok(())
}

#[allow(clippy::too_many_arguments)]
async fn send_response_to_sinks<State: OutboxTable + FsmTable>(
&mut self,
Expand Down

0 comments on commit 0edc593

Please sign in to comment.