From d6cfaa301db9dce8996a4a7eafe5ddfd926331cb Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 9 Dec 2024 12:19:31 +0100 Subject: [PATCH 1/5] Proposal of new semantics --- crates/types/src/invocation.rs | 48 +++++++++++++++++++++++++++++++--- 1 file changed, 45 insertions(+), 3 deletions(-) diff --git a/crates/types/src/invocation.rs b/crates/types/src/invocation.rs index 240760f7c..9cb3c8a97 100644 --- a/crates/types/src/invocation.rs +++ b/crates/types/src/invocation.rs @@ -273,6 +273,42 @@ impl fmt::Display for InvocationTarget { } } +/// Concurrency guarantee of the invocation request. +#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +enum ConcurrencyGuarantee { + /// Enqueue the invocation when the target is busy + EnqueueWhenBusy { + queue_target: ByteString, + queue_key: ByteString, + }, + /// No queueing, just execute the request + MaxParallelism, + /// Use the default from the target semantics + #[default] + UseTargetDefault +} + +/// Behavior when sending an invocation request and the request already exists. +#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +enum BehaviorOnExistingInvocationId { + /// Attach to the existing invocation + Attach, + /// Reply with "conflict" error response + ReplyConflict, + /// Just drop the request + Drop, + /// Use the default from the target/idempotency key semantics + #[default] + UseTargetDefault +} + +/// Invocation request flow is as follows: +/// +/// 1. Invocation is proposed in the PP log. +/// 2. PP will first check if another invocation with the same id exists. If true, it applies the [`BehaviorOnExistingInvocationId`] +/// 3. If the invocation id doesn't exist, wait for the `execution_time` if present, otherwise continue immediately. +/// 4. Apply the given [`ConcurrencyGuarantee`]. +/// 5. Finally execute it sending the request to the service endpoint. #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct InvocationRequestHeader { pub id: InvocationId, @@ -280,12 +316,18 @@ pub struct InvocationRequestHeader { pub headers: Vec
, pub span_context: ServiceInvocationSpanContext, - /// Key to use for idempotent request. If none, this request is not idempotent, or it's a workflow call. See [`InvocationRequestHeader::is_idempotent`]. - pub idempotency_key: Option, - + /// Behavior to apply on an existing invocation id. + #[serde(default)] + pub behavior_on_existing_idempotency_key: BehaviorOnExistingInvocationId, /// Time when the request should be executed. If none, it's executed immediately. pub execution_time: Option, + /// Concurrency behavior to apply. + #[serde(default)] + pub concurrency_guarantee: ConcurrencyGuarantee, + /// Key to use for idempotent request. If none, this request is not idempotent, or it's a workflow call. See [`InvocationRequestHeader::is_idempotent`]. + /// This value is propagated only for observability purposes, as the invocation id is already deterministic given the invocation id. + pub idempotency_key: Option, /// Retention duration of the completed status. If none, the completed status is not retained. pub completion_retention_duration: Option, } From 98b294b2f0479ee69b3cd15a01288d11bead2ffc Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 9 Dec 2024 12:27:08 +0100 Subject: [PATCH 2/5] Describe target default behaviors --- crates/types/src/invocation.rs | 36 ++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/crates/types/src/invocation.rs b/crates/types/src/invocation.rs index 9cb3c8a97..38c77bd04 100644 --- a/crates/types/src/invocation.rs +++ b/crates/types/src/invocation.rs @@ -285,7 +285,24 @@ enum ConcurrencyGuarantee { MaxParallelism, /// Use the default from the target semantics #[default] - UseTargetDefault + UseTargetDefault, +} + +impl ConcurrencyGuarantee { + fn infer_target_default(invocation_target: &InvocationTarget) -> ConcurrencyGuarantee { + match invocation_target { + InvocationTarget::VirtualObject { handler_ty: VirtualObjectHandlerType::Exclusive, name, key, .. } => { + ConcurrencyGuarantee::EnqueueWhenBusy { + queue_target: name.clone(), + queue_key: key.clone(), + } + } + InvocationTarget::Service { .. } | + InvocationTarget::VirtualObject { handler_ty: VirtualObjectHandlerType::Shared, .. } | + /* For workflow, there is no enqueueing as we have the behavior on existing invocation id that guarantees correctness */ + InvocationTarget::Workflow { .. } => ConcurrencyGuarantee::MaxParallelism + } + } } /// Behavior when sending an invocation request and the request already exists. @@ -299,7 +316,22 @@ enum BehaviorOnExistingInvocationId { Drop, /// Use the default from the target/idempotency key semantics #[default] - UseTargetDefault + UseTargetDefault, +} + +impl BehaviorOnExistingInvocationId { + fn infer_target_default( + invocation_target_type: InvocationTargetType, + has_idempotency_key: bool, + ) -> BehaviorOnExistingInvocationId { + match (invocation_target_type, has_idempotency_key) { + (InvocationTargetType::Workflow(WorkflowHandlerType::Workflow), _) => { + BehaviorOnExistingInvocationId::ReplyConflict + } + (_, true) => BehaviorOnExistingInvocationId::Attach, + _ => BehaviorOnExistingInvocationId::Drop, + } + } } /// Invocation request flow is as follows: From 1726011d4484b17c3475f5de99b693be75a1adda Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 10 Dec 2024 09:35:09 +0100 Subject: [PATCH 3/5] Better description and names --- crates/types/src/invocation.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/crates/types/src/invocation.rs b/crates/types/src/invocation.rs index 38c77bd04..d83133911 100644 --- a/crates/types/src/invocation.rs +++ b/crates/types/src/invocation.rs @@ -276,10 +276,12 @@ impl fmt::Display for InvocationTarget { /// Concurrency guarantee of the invocation request. #[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] enum ConcurrencyGuarantee { - /// Enqueue the invocation when the target is busy + /// Enqueue the invocation in the inbox when the target is busy EnqueueWhenBusy { - queue_target: ByteString, - queue_key: ByteString, + /// fka ServiceId.name + inbox_target: ByteString, + /// fka ServiceId.key + inbox_key: ByteString, }, /// No queueing, just execute the request MaxParallelism, @@ -293,8 +295,8 @@ impl ConcurrencyGuarantee { match invocation_target { InvocationTarget::VirtualObject { handler_ty: VirtualObjectHandlerType::Exclusive, name, key, .. } => { ConcurrencyGuarantee::EnqueueWhenBusy { - queue_target: name.clone(), - queue_key: key.clone(), + inbox_target: name.clone(), + inbox_key: key.clone(), } } InvocationTarget::Service { .. } | @@ -337,7 +339,7 @@ impl BehaviorOnExistingInvocationId { /// Invocation request flow is as follows: /// /// 1. Invocation is proposed in the PP log. -/// 2. PP will first check if another invocation with the same id exists. If true, it applies the [`BehaviorOnExistingInvocationId`] +/// 2. PP will first check if another invocation with the same id exists. If it exists, it applies the [`BehaviorOnExistingInvocationId`], otherwise moves to point 3. /// 3. If the invocation id doesn't exist, wait for the `execution_time` if present, otherwise continue immediately. /// 4. Apply the given [`ConcurrencyGuarantee`]. /// 5. Finally execute it sending the request to the service endpoint. From 8da96befd23a084723ce604468a556a61a7080a7 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Thu, 19 Dec 2024 10:56:23 +0100 Subject: [PATCH 4/5] Feedback --- crates/types/src/invocation.rs | 42 +++++++++++++++++----------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/crates/types/src/invocation.rs b/crates/types/src/invocation.rs index d83133911..9308fb198 100644 --- a/crates/types/src/invocation.rs +++ b/crates/types/src/invocation.rs @@ -275,26 +275,26 @@ impl fmt::Display for InvocationTarget { /// Concurrency guarantee of the invocation request. #[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -enum ConcurrencyGuarantee { - /// Enqueue the invocation in the inbox when the target is busy - EnqueueWhenBusy { +enum Concurrency { + /// Invocation executes sequential wrt the inbox address (target + key) + Sequential { /// fka ServiceId.name inbox_target: ByteString, /// fka ServiceId.key inbox_key: ByteString, }, /// No queueing, just execute the request - MaxParallelism, - /// Use the default from the target semantics + Concurrent, + /// Use the default from the target semantics, see [`Concurrency::infer_target_default`] #[default] UseTargetDefault, } -impl ConcurrencyGuarantee { - fn infer_target_default(invocation_target: &InvocationTarget) -> ConcurrencyGuarantee { +impl Concurrency { + fn infer_target_default(invocation_target: &InvocationTarget) -> Concurrency { match invocation_target { InvocationTarget::VirtualObject { handler_ty: VirtualObjectHandlerType::Exclusive, name, key, .. } => { - ConcurrencyGuarantee::EnqueueWhenBusy { + Concurrency::Sequential { inbox_target: name.clone(), inbox_key: key.clone(), } @@ -302,36 +302,36 @@ impl ConcurrencyGuarantee { InvocationTarget::Service { .. } | InvocationTarget::VirtualObject { handler_ty: VirtualObjectHandlerType::Shared, .. } | /* For workflow, there is no enqueueing as we have the behavior on existing invocation id that guarantees correctness */ - InvocationTarget::Workflow { .. } => ConcurrencyGuarantee::MaxParallelism + InvocationTarget::Workflow { .. } => Concurrency::Concurrent } } } -/// Behavior when sending an invocation request and the request already exists. +/// Behavior when sending an invocation request and the invocation already exists. #[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -enum BehaviorOnExistingInvocationId { +enum IfExists { /// Attach to the existing invocation Attach, /// Reply with "conflict" error response ReplyConflict, /// Just drop the request Drop, - /// Use the default from the target/idempotency key semantics + /// Use the default from the target/idempotency key semantics, see [`IfExists::infer_target_default`] #[default] UseTargetDefault, } -impl BehaviorOnExistingInvocationId { +impl IfExists { fn infer_target_default( invocation_target_type: InvocationTargetType, has_idempotency_key: bool, - ) -> BehaviorOnExistingInvocationId { + ) -> IfExists { match (invocation_target_type, has_idempotency_key) { (InvocationTargetType::Workflow(WorkflowHandlerType::Workflow), _) => { - BehaviorOnExistingInvocationId::ReplyConflict + IfExists::ReplyConflict } - (_, true) => BehaviorOnExistingInvocationId::Attach, - _ => BehaviorOnExistingInvocationId::Drop, + (_, true) => IfExists::Attach, + _ => IfExists::Drop, } } } @@ -339,9 +339,9 @@ impl BehaviorOnExistingInvocationId { /// Invocation request flow is as follows: /// /// 1. Invocation is proposed in the PP log. -/// 2. PP will first check if another invocation with the same id exists. If it exists, it applies the [`BehaviorOnExistingInvocationId`], otherwise moves to point 3. +/// 2. PP will first check if another invocation with the same id exists. If it exists, it applies the [`IfExists`], otherwise moves to point 3. /// 3. If the invocation id doesn't exist, wait for the `execution_time` if present, otherwise continue immediately. -/// 4. Apply the given [`ConcurrencyGuarantee`]. +/// 4. Apply the given [`Concurrency`]. /// 5. Finally execute it sending the request to the service endpoint. #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct InvocationRequestHeader { @@ -352,12 +352,12 @@ pub struct InvocationRequestHeader { /// Behavior to apply on an existing invocation id. #[serde(default)] - pub behavior_on_existing_idempotency_key: BehaviorOnExistingInvocationId, + pub if_exists: IfExists, /// Time when the request should be executed. If none, it's executed immediately. pub execution_time: Option, /// Concurrency behavior to apply. #[serde(default)] - pub concurrency_guarantee: ConcurrencyGuarantee, + pub concurrency: Concurrency, /// Key to use for idempotent request. If none, this request is not idempotent, or it's a workflow call. See [`InvocationRequestHeader::is_idempotent`]. /// This value is propagated only for observability purposes, as the invocation id is already deterministic given the invocation id. From a3205d90f0e1091bf17d564768d4053c9c40fd23 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Thu, 19 Dec 2024 11:21:18 +0100 Subject: [PATCH 5/5] Remove UseTargetDefault, just use Option for defaulting. Implement PartitionKey assignment logic --- crates/types/src/identifiers.rs | 45 +++++++++++++++++++++++---- crates/types/src/invocation.rs | 54 ++++++++++++++++++++------------- 2 files changed, 72 insertions(+), 27 deletions(-) diff --git a/crates/types/src/identifiers.rs b/crates/types/src/identifiers.rs index c950cdc0f..dbe114fd8 100644 --- a/crates/types/src/identifiers.rs +++ b/crates/types/src/identifiers.rs @@ -27,6 +27,7 @@ use crate::errors::IdDecodeError; use crate::id_util::IdDecoder; use crate::id_util::IdEncoder; use crate::id_util::IdResourceType; +use crate::invocation; use crate::invocation::{InvocationTarget, InvocationTargetType, WorkflowHandlerType}; use crate::time::MillisSinceEpoch; @@ -130,13 +131,19 @@ pub type EntryIndex = u32; /// which identifies a consecutive range of partition keys. pub type PartitionKey = u64; -/// Returns the partition key computed from either the service_key, or idempotency_key, if possible +/// Returns the partition key. The precedence is as follows: +/// +/// 1. If there is a service key, that's what we use to compute the key +/// 2. Otherwise, we use the concurrency key +/// 3. Otherwise, we use the idempotency key fn deterministic_partition_key( service_key: Option<&str>, + concurrency_key: Option<&str>, idempotency_key: Option<&str>, ) -> Option { service_key .map(partitioner::HashPartitioner::compute_partition_key) + .or_else(|| concurrency_key.map(partitioner::HashPartitioner::compute_partition_key)) .or_else(|| idempotency_key.map(partitioner::HashPartitioner::compute_partition_key)) } @@ -422,12 +429,19 @@ pub trait WithInvocationId { pub type EncodedInvocationId = [u8; InvocationId::SIZE_IN_BYTES]; impl InvocationId { - pub fn generate(invocation_target: &InvocationTarget, idempotency_key: Option<&str>) -> Self { + pub fn generate( + invocation_target: &InvocationTarget, + invocation_concurrency: Option<&invocation::Concurrency>, + idempotency_key: Option<&str>, + ) -> Self { // --- Partition key generation let partition_key = // Either try to generate the deterministic partition key, if possible deterministic_partition_key( - invocation_target.key().map(|bs| bs.as_ref()), + invocation_target.key() + .map(|bs| bs.as_ref()), + invocation_concurrency + .and_then(|c| c.inbox_key().map(|bs| bs.as_ref())), idempotency_key, ) // If no deterministic partition key can be generated, just pick a random number @@ -585,6 +599,8 @@ impl IdempotencyId { // * For services with key, the partition key is the hash(service key), this due to the virtual object locking requirement. let partition_key = deterministic_partition_key( service_key.as_ref().map(|bs| bs.as_ref()), + // This data structure won't need anymore the partition key anymore, by the time we enable the new custom concurrency feature. + None, Some(&idempotency_key), ) .expect("A deterministic partition key can always be generated for idempotency id"); @@ -975,7 +991,7 @@ mod mocks { impl InvocationId { pub fn mock_generate(invocation_target: &InvocationTarget) -> Self { - InvocationId::generate(invocation_target, None) + InvocationId::generate(invocation_target, None, None) } pub fn mock_random() -> Self { @@ -1132,11 +1148,28 @@ mod tests { let idempotent_key = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); assert_eq!( - InvocationId::generate(&invocation_target, Some(&idempotent_key)), - InvocationId::generate(&invocation_target, Some(&idempotent_key)) + InvocationId::generate(&invocation_target, None, Some(&idempotent_key)), + InvocationId::generate(&invocation_target, None, Some(&idempotent_key)) ); } + #[test] + fn deterministic_partition_key_for_service_request() { + let invocation_target = InvocationTarget::mock_service(); + let concurrency = invocation::Concurrency::Sequential { + inbox_target: invocation_target.service_name().clone(), + inbox_key: Alphanumeric + .sample_string(&mut rand::thread_rng(), 16) + .into(), + }; + + let id1 = InvocationId::generate(&invocation_target, Some(&concurrency), None); + let id2 = InvocationId::generate(&invocation_target, Some(&concurrency), None); + + assert_eq!(id1.partition_key(), id2.partition_key()); + assert_ne!(id1.invocation_uuid(), id2.invocation_uuid()); + } + #[test] fn deterministic_invocation_id_for_workflow_request() { let invocation_target = InvocationTarget::mock_workflow(); diff --git a/crates/types/src/invocation.rs b/crates/types/src/invocation.rs index 9308fb198..84b00ae6e 100644 --- a/crates/types/src/invocation.rs +++ b/crates/types/src/invocation.rs @@ -274,8 +274,8 @@ impl fmt::Display for InvocationTarget { } /// Concurrency guarantee of the invocation request. -#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -enum Concurrency { +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum Concurrency { /// Invocation executes sequential wrt the inbox address (target + key) Sequential { /// fka ServiceId.name @@ -285,13 +285,18 @@ enum Concurrency { }, /// No queueing, just execute the request Concurrent, - /// Use the default from the target semantics, see [`Concurrency::infer_target_default`] - #[default] - UseTargetDefault, } impl Concurrency { - fn infer_target_default(invocation_target: &InvocationTarget) -> Concurrency { + pub fn inbox_key(&self) -> Option<&ByteString> { + if let Concurrency::Sequential { inbox_key, .. } = self { + Some(inbox_key) + } else { + None + } + } + + pub fn infer_target_default(invocation_target: &InvocationTarget) -> Concurrency { match invocation_target { InvocationTarget::VirtualObject { handler_ty: VirtualObjectHandlerType::Exclusive, name, key, .. } => { Concurrency::Sequential { @@ -308,21 +313,18 @@ impl Concurrency { } /// Behavior when sending an invocation request and the invocation already exists. -#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -enum IfExists { +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum IfExists { /// Attach to the existing invocation Attach, /// Reply with "conflict" error response ReplyConflict, /// Just drop the request Drop, - /// Use the default from the target/idempotency key semantics, see [`IfExists::infer_target_default`] - #[default] - UseTargetDefault, } impl IfExists { - fn infer_target_default( + pub fn infer_target_default( invocation_target_type: InvocationTargetType, has_idempotency_key: bool, ) -> IfExists { @@ -350,14 +352,12 @@ pub struct InvocationRequestHeader { pub headers: Vec
, pub span_context: ServiceInvocationSpanContext, - /// Behavior to apply on an existing invocation id. - #[serde(default)] - pub if_exists: IfExists, + /// Behavior to apply on an existing invocation id. If not present, [`IfPresent::infer_target_default`] should be used. + pub if_exists: Option, /// Time when the request should be executed. If none, it's executed immediately. pub execution_time: Option, - /// Concurrency behavior to apply. - #[serde(default)] - pub concurrency: Concurrency, + /// Concurrency behavior to apply. If not present, [`Concurrency::infer_target_default`] should be used. + pub concurrency: Option, /// Key to use for idempotent request. If none, this request is not idempotent, or it's a workflow call. See [`InvocationRequestHeader::is_idempotent`]. /// This value is propagated only for observability purposes, as the invocation id is already deterministic given the invocation id. @@ -376,6 +376,8 @@ impl InvocationRequestHeader { idempotency_key: None, execution_time: None, completion_retention_duration: None, + if_exists: None, + concurrency: None, } } @@ -428,10 +430,13 @@ pub struct ServiceInvocation { pub source: Source, pub span_context: ServiceInvocationSpanContext, pub headers: Vec
, - /// Time when the request should be executed + pub idempotency_key: Option, + + pub if_exists: Option, pub execution_time: Option, + pub concurrency: Option, + pub completion_retention_duration: Option, - pub idempotency_key: Option, // Where to send the response, if any pub response_sink: Option, @@ -458,10 +463,12 @@ impl ServiceInvocation { span_context: request.header.span_context, headers: request.header.headers, execution_time: request.header.execution_time, + concurrency: request.header.concurrency, completion_retention_duration: request.header.completion_retention_duration, idempotency_key: request.header.idempotency_key, response_sink: None, submit_notification_sink: None, + if_exists: request.header.if_exists, } } @@ -479,9 +486,11 @@ impl ServiceInvocation { span_context: ServiceInvocationSpanContext::empty(), headers: vec![], execution_time: None, + concurrency: None, completion_retention_duration: None, idempotency_key: None, submit_notification_sink: None, + if_exists: None, } } @@ -960,6 +969,7 @@ impl InvocationQuery { handler_ty: WorkflowHandlerType::Workflow, }, None, + None, ), InvocationQuery::IdempotencyId(IdempotencyId { service_name, @@ -980,7 +990,7 @@ impl InvocationQuery { VirtualObjectHandlerType::Exclusive, ), }; - InvocationId::generate(&target, Some(idempotency_key.deref())) + InvocationId::generate(&target, None, Some(idempotency_key.deref())) } } } @@ -1071,9 +1081,11 @@ mod mocks { span_context: Default::default(), headers: vec![], execution_time: None, + concurrency: None, completion_retention_duration: None, idempotency_key: None, submit_notification_sink: None, + if_exists: None, } } }