Skip to content

Commit

Permalink
Introduce InvocationStatus::Killed (#2335)
Browse files Browse the repository at this point in the history
Add InvocationStatus::Killed as experimental feature.
Fix cancellation/killing of scheduled invocations.
Reply KILLED when receiving attach/get output and deduplication.
Handle killed invocations during leadership change.
Add acknowledgment flag to invoker abort command. This makes sure that after a kill, the invoker will send a "terminal" effect to the state machine.
  • Loading branch information
slinkydeveloper authored Dec 6, 2024
1 parent 1cdb6f8 commit 561679c
Show file tree
Hide file tree
Showing 29 changed files with 927 additions and 291 deletions.
18 changes: 18 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,24 @@ jobs:
RUST_LOG=info,restate_invoker=trace,restate_ingress_http=trace,restate_bifrost=trace,restate_log_server=trace,restate_core::partitions=trace,restate=debug
testArtifactOutput: sdk-java-kafka-next-gen-integration-test-report

sdk-java-invocation-status-killed:
name: Run SDK-Java integration tests with InvocationStatusKilled
permissions:
contents: read
issues: read
checks: write
pull-requests: write
actions: read
secrets: inherit
needs: docker
uses: restatedev/sdk-java/.github/workflows/integration.yaml@main
with:
restateCommit: ${{ github.event.pull_request.head.sha || github.sha }}
envVars: |
RESTATE_WORKER__EXPERIMENTAL_FEATURE_INVOCATION_STATUS_KILLED=true
RUST_LOG=info,restate_invoker=trace,restate_ingress_http=trace,restate_bifrost=trace,restate_log_server=trace,restate_core::partitions=trace,restate=debug
testArtifactOutput: sdk-java-invocation-status-killed-integration-test-report

sdk-python:
name: Run SDK-Python integration tests
permissions:
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions cli/src/clients/datafusion_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub enum InvocationState {
Running,
Suspended,
BackingOff,
Killed,
Completed,
}

Expand All @@ -142,6 +143,7 @@ impl FromStr for InvocationState {
"suspended" => Self::Suspended,
"backing-off" => Self::BackingOff,
"completed" => Self::Completed,
"killed" => Self::Killed,
_ => Self::Unknown,
})
}
Expand All @@ -157,6 +159,7 @@ impl Display for InvocationState {
InvocationState::Running => write!(f, "running"),
InvocationState::Suspended => write!(f, "suspended"),
InvocationState::BackingOff => write!(f, "backing-off"),
InvocationState::Killed => write!(f, "killed"),
InvocationState::Completed => write!(f, "completed"),
}
}
Expand Down
1 change: 1 addition & 0 deletions cli/src/ui/invocations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ pub fn invocation_status_style(status: InvocationState) -> Style {
InvocationState::Suspended => DStyle::new().dim(),
InvocationState::BackingOff => DStyle::new().red(),
InvocationState::Completed => DStyle::new().blue(),
InvocationState::Killed => DStyle::new().red(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/invoker-api/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub trait InvokerHandle<SR> {
&mut self,
partition_leader_epoch: PartitionLeaderEpoch,
invocation_id: InvocationId,
// If true, acknowledge the abort. This will generate a Failed effect
acknowledge: bool,
) -> impl Future<Output = Result<(), NotRunningError>> + Send;

fn register_partition(
Expand Down
1 change: 1 addition & 0 deletions crates/invoker-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub mod test_util {
&mut self,
_partition_leader_epoch: PartitionLeaderEpoch,
_invocation_id: InvocationId,
_acknowledge: bool,
) -> Result<(), NotRunningError> {
Ok(())
}
Expand Down
3 changes: 3 additions & 0 deletions crates/invoker-impl/src/input_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub(crate) enum InputCommand<SR> {
Abort {
partition: PartitionLeaderEpoch,
invocation_id: InvocationId,
acknowledge: bool,
},

/// Command used to clean up internal state when a partition leader is going away
Expand Down Expand Up @@ -129,11 +130,13 @@ impl<SR: Send> restate_invoker_api::InvokerHandle<SR> for InvokerHandle<SR> {
&mut self,
partition: PartitionLeaderEpoch,
invocation_id: InvocationId,
acknowledge: bool,
) -> Result<(), NotRunningError> {
self.input
.send(InputCommand::Abort {
partition,
invocation_id,
acknowledge,
})
.map_err(|_| NotRunningError)
}
Expand Down
22 changes: 17 additions & 5 deletions crates/invoker-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub use input_command::ChannelStatusReader;
pub use input_command::InvokerHandle;
use restate_service_client::{AssumeRoleCacheMode, ServiceClient};
use restate_types::deployment::PinnedDeployment;
use restate_types::errors::KILLED_INVOCATION_ERROR;
use restate_types::invocation::InvocationTarget;
use restate_types::schema::service::ServiceMetadataResolver;

Expand Down Expand Up @@ -351,8 +352,8 @@ where
self.handle_register_partition(partition, partition_key_range,
storage_reader, sender);
},
InputCommand::Abort { partition, invocation_id } => {
self.handle_abort_invocation(partition, invocation_id);
InputCommand::Abort { partition, invocation_id, acknowledge } => {
self.handle_abort_invocation(partition, invocation_id, acknowledge).await;
}
InputCommand::AbortAllPartition { partition } => {
self.handle_abort_partition(partition);
Expand Down Expand Up @@ -808,12 +809,13 @@ where
restate.invoker.partition_leader_epoch = ?partition,
)
)]
fn handle_abort_invocation(
async fn handle_abort_invocation(
&mut self,
partition: PartitionLeaderEpoch,
invocation_id: InvocationId,
acknowledge: bool,
) {
if let Some((_, _, mut ism)) = self
if let Some((tx, _, mut ism)) = self
.invocation_state_machine_manager
.remove_invocation(partition, &invocation_id)
{
Expand All @@ -823,6 +825,14 @@ where
ism.abort();
self.quota.unreserve_slot();
self.status_store.on_end(&partition, &invocation_id);
if acknowledge {
let _ = tx
.send(Effect {
invocation_id,
kind: EffectKind::Failed(KILLED_INVOCATION_ERROR),
})
.await;
}
} else {
trace!("Ignoring Abort command because there is no matching partition/invocation");
}
Expand Down Expand Up @@ -1413,7 +1423,9 @@ mod tests {
assert_eq!(*available_slots, 1);

// Abort the invocation
service_inner.handle_abort_invocation(MOCK_PARTITION, invocation_id);
service_inner
.handle_abort_invocation(MOCK_PARTITION, invocation_id, false)
.await;

// Check the quota
let_assert!(InvokerConcurrencyQuota::Limited { available_slots } = &service_inner.quota);
Expand Down
46 changes: 30 additions & 16 deletions crates/partition-store/src/invocation_status_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use futures::Stream;
use futures_util::stream;
use restate_rocksdb::RocksDbPerfGuard;
use restate_storage_api::invocation_status_table::{
InvocationStatus, InvocationStatusTable, InvocationStatusV1, ReadOnlyInvocationStatusTable,
InvocationStatus, InvocationStatusTable, InvocationStatusV1,
InvokedOrKilledInvocationStatusLite, ReadOnlyInvocationStatusTable,
};
use restate_storage_api::{Result, StorageError};
use restate_types::identifiers::{InvocationId, InvocationUuid, PartitionKey, WithPartitionKey};
use restate_types::invocation::InvocationTarget;
use restate_types::storage::StorageCodec;
use std::ops::RangeInclusive;
use tracing::trace;
Expand Down Expand Up @@ -166,10 +166,10 @@ fn delete_invocation_status<S: StorageAccess>(storage: &mut S, invocation_id: &I
storage.delete_key(&create_invocation_status_key(invocation_id));
}

fn invoked_invocations<S: StorageAccess>(
fn invoked_or_killed_invocations<S: StorageAccess>(
storage: &mut S,
partition_key_range: RangeInclusive<PartitionKey>,
) -> Vec<Result<(InvocationId, InvocationTarget)>> {
) -> Vec<Result<InvokedOrKilledInvocationStatusLite>> {
let _x = RocksDbPerfGuard::new("invoked-invocations");
let mut invocations = storage.for_each_key_value_in_place(
FullScanPartitionKeyRange::<InvocationStatusKeyV1>(partition_key_range.clone()),
Expand All @@ -185,7 +185,7 @@ fn invoked_invocations<S: StorageAccess>(
invocations.extend(storage.for_each_key_value_in_place(
FullScanPartitionKeyRange::<InvocationStatusKey>(partition_key_range),
|mut k, mut v| {
let result = read_invoked_full_invocation_id(&mut k, &mut v).transpose();
let result = read_invoked_or_killed_status_lite(&mut k, &mut v).transpose();
if let Some(res) = result {
TableScanIterationDecision::Emit(res)
} else {
Expand Down Expand Up @@ -239,27 +239,41 @@ fn all_invocation_status<S: StorageAccess>(
fn read_invoked_v1_full_invocation_id(
mut k: &mut &[u8],
v: &mut &[u8],
) -> Result<Option<(InvocationId, InvocationTarget)>> {
) -> Result<Option<InvokedOrKilledInvocationStatusLite>> {
let invocation_id = invocation_id_from_v1_key_bytes(&mut k)?;
let invocation_status = StorageCodec::decode::<InvocationStatusV1, _>(v)
.map_err(|err| StorageError::Generic(err.into()))?;
if let InvocationStatus::Invoked(invocation_meta) = invocation_status.0 {
Ok(Some((invocation_id, invocation_meta.invocation_target)))
Ok(Some(InvokedOrKilledInvocationStatusLite {
invocation_id,
invocation_target: invocation_meta.invocation_target,
is_invoked: true,
}))
} else {
Ok(None)
}
}

fn read_invoked_full_invocation_id(
fn read_invoked_or_killed_status_lite(
mut k: &mut &[u8],
v: &mut &[u8],
) -> Result<Option<(InvocationId, InvocationTarget)>> {
) -> Result<Option<InvokedOrKilledInvocationStatusLite>> {
// TODO this can be improved by simply parsing InvocationTarget and the Status enum
let invocation_id = invocation_id_from_key_bytes(&mut k)?;
let invocation_status = StorageCodec::decode::<InvocationStatus, _>(v)
.map_err(|err| StorageError::Generic(err.into()))?;
if let InvocationStatus::Invoked(invocation_meta) = invocation_status {
Ok(Some((invocation_id, invocation_meta.invocation_target)))
Ok(Some(InvokedOrKilledInvocationStatusLite {
invocation_id,
invocation_target: invocation_meta.invocation_target,
is_invoked: true,
}))
} else if let InvocationStatus::Killed(invocation_meta) = invocation_status {
Ok(Some(InvokedOrKilledInvocationStatusLite {
invocation_id,
invocation_target: invocation_meta.invocation_target,
is_invoked: false,
}))
} else {
Ok(None)
}
Expand All @@ -274,10 +288,10 @@ impl ReadOnlyInvocationStatusTable for PartitionStore {
get_invocation_status(self, invocation_id)
}

fn all_invoked_invocations(
fn all_invoked_or_killed_invocations(
&mut self,
) -> impl Stream<Item = Result<(InvocationId, InvocationTarget)>> + Send {
stream::iter(invoked_invocations(
) -> impl Stream<Item = Result<InvokedOrKilledInvocationStatusLite>> + Send {
stream::iter(invoked_or_killed_invocations(
self,
self.partition_key_range().clone(),
))
Expand All @@ -300,10 +314,10 @@ impl<'a> ReadOnlyInvocationStatusTable for PartitionStoreTransaction<'a> {
try_migrate_and_get_invocation_status(self, invocation_id)
}

fn all_invoked_invocations(
fn all_invoked_or_killed_invocations(
&mut self,
) -> impl Stream<Item = Result<(InvocationId, InvocationTarget)>> + Send {
stream::iter(invoked_invocations(
) -> impl Stream<Item = Result<InvokedOrKilledInvocationStatusLite>> + Send {
stream::iter(invoked_or_killed_invocations(
self,
self.partition_key_range().clone(),
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use googletest::prelude::*;
use once_cell::sync::Lazy;
use restate_storage_api::invocation_status_table::{
InFlightInvocationMetadata, InvocationStatus, InvocationStatusTable, InvocationStatusV1,
JournalMetadata, ReadOnlyInvocationStatusTable, StatusTimestamps,
InvokedOrKilledInvocationStatusLite, JournalMetadata, ReadOnlyInvocationStatusTable,
StatusTimestamps,
};
use restate_storage_api::Transaction;
use restate_types::identifiers::{InvocationId, PartitionProcessorRpcRequestId, WithPartitionKey};
Expand Down Expand Up @@ -94,6 +95,19 @@ fn invoked_status(invocation_target: InvocationTarget) -> InvocationStatus {
})
}

fn killed_status(invocation_target: InvocationTarget) -> InvocationStatus {
InvocationStatus::Killed(InFlightInvocationMetadata {
invocation_target,
journal_metadata: JournalMetadata::initialize(ServiceInvocationSpanContext::empty()),
pinned_deployment: None,
response_sinks: HashSet::new(),
timestamps: StatusTimestamps::init(MillisSinceEpoch::new(0)),
source: Source::Ingress(*RPC_REQUEST_ID),
completion_retention_duration: Duration::ZERO,
idempotency_key: None,
})
}

fn suspended_status(invocation_target: InvocationTarget) -> InvocationStatus {
InvocationStatus::Suspended {
metadata: InFlightInvocationMetadata {
Expand Down Expand Up @@ -131,7 +145,7 @@ async fn populate_data<T: InvocationStatusTable>(txn: &mut T) {

txn.put_invocation_status(
&INVOCATION_ID_4,
&invoked_status(INVOCATION_TARGET_4.clone()),
&killed_status(INVOCATION_TARGET_4.clone()),
)
.await;

Expand All @@ -154,22 +168,34 @@ async fn verify_point_lookups<T: InvocationStatusTable>(txn: &mut T) {
txn.get_invocation_status(&INVOCATION_ID_4)
.await
.expect("should not fail"),
invoked_status(INVOCATION_TARGET_4.clone())
killed_status(INVOCATION_TARGET_4.clone())
);
}

async fn verify_all_svc_with_status_invoked<T: InvocationStatusTable>(txn: &mut T) {
async fn verify_all_svc_with_status_invoked_or_killed<T: InvocationStatusTable>(txn: &mut T) {
let actual = txn
.all_invoked_invocations()
.all_invoked_or_killed_invocations()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_that!(
actual,
unordered_elements_are![
eq((*INVOCATION_ID_1, INVOCATION_TARGET_1.clone())),
eq((*INVOCATION_ID_2, INVOCATION_TARGET_2.clone())),
eq((*INVOCATION_ID_4, INVOCATION_TARGET_4.clone()))
eq(InvokedOrKilledInvocationStatusLite {
invocation_id: *INVOCATION_ID_1,
invocation_target: INVOCATION_TARGET_1.clone(),
is_invoked: true,
}),
eq(InvokedOrKilledInvocationStatusLite {
invocation_id: *INVOCATION_ID_2,
invocation_target: INVOCATION_TARGET_2.clone(),
is_invoked: true,
}),
eq(InvokedOrKilledInvocationStatusLite {
invocation_id: *INVOCATION_ID_4,
invocation_target: INVOCATION_TARGET_4.clone(),
is_invoked: false,
}),
]
);
}
Expand All @@ -181,7 +207,7 @@ async fn test_invocation_status() {
populate_data(&mut txn).await;

verify_point_lookups(&mut txn).await;
verify_all_svc_with_status_invoked(&mut txn).await;
verify_all_svc_with_status_invoked_or_killed(&mut txn).await;
}

#[restate_core::test(flavor = "multi_thread", worker_threads = 2)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ message InvocationStatusV2 {
INBOXED = 2;
INVOKED = 3;
SUSPENDED = 4;
KILLED = 6;
COMPLETED = 5;
}

Expand Down
Loading

0 comments on commit 561679c

Please sign in to comment.