Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Decoupling concurrency guarantees/behavior on existing invocation id in invocation request #2393

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 39 additions & 6 deletions crates/types/src/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::errors::IdDecodeError;
use crate::id_util::IdDecoder;
use crate::id_util::IdEncoder;
use crate::id_util::IdResourceType;
use crate::invocation;
use crate::invocation::{InvocationTarget, InvocationTargetType, WorkflowHandlerType};
use crate::time::MillisSinceEpoch;

Expand Down Expand Up @@ -130,13 +131,19 @@ pub type EntryIndex = u32;
/// which identifies a consecutive range of partition keys.
pub type PartitionKey = u64;

/// Returns the partition key computed from either the service_key, or idempotency_key, if possible
/// Returns the partition key. The precedence is as follows:
///
/// 1. If there is a service key, that's what we use to compute the key
/// 2. Otherwise, we use the concurrency key
/// 3. Otherwise, we use the idempotency key
fn deterministic_partition_key(
service_key: Option<&str>,
concurrency_key: Option<&str>,
idempotency_key: Option<&str>,
) -> Option<PartitionKey> {
service_key
.map(partitioner::HashPartitioner::compute_partition_key)
.or_else(|| concurrency_key.map(partitioner::HashPartitioner::compute_partition_key))
.or_else(|| idempotency_key.map(partitioner::HashPartitioner::compute_partition_key))
}

Expand Down Expand Up @@ -422,12 +429,19 @@ pub trait WithInvocationId {
pub type EncodedInvocationId = [u8; InvocationId::SIZE_IN_BYTES];

impl InvocationId {
pub fn generate(invocation_target: &InvocationTarget, idempotency_key: Option<&str>) -> Self {
pub fn generate(
invocation_target: &InvocationTarget,
invocation_concurrency: Option<&invocation::Concurrency>,
idempotency_key: Option<&str>,
) -> Self {
// --- Partition key generation
let partition_key =
// Either try to generate the deterministic partition key, if possible
deterministic_partition_key(
invocation_target.key().map(|bs| bs.as_ref()),
invocation_target.key()
.map(|bs| bs.as_ref()),
invocation_concurrency
.and_then(|c| c.inbox_key().map(|bs| bs.as_ref())),
idempotency_key,
)
// If no deterministic partition key can be generated, just pick a random number
Expand Down Expand Up @@ -585,6 +599,8 @@ impl IdempotencyId {
// * For services with key, the partition key is the hash(service key), this due to the virtual object locking requirement.
let partition_key = deterministic_partition_key(
service_key.as_ref().map(|bs| bs.as_ref()),
// This data structure won't need anymore the partition key anymore, by the time we enable the new custom concurrency feature.
None,
Some(&idempotency_key),
)
.expect("A deterministic partition key can always be generated for idempotency id");
Expand Down Expand Up @@ -975,7 +991,7 @@ mod mocks {

impl InvocationId {
pub fn mock_generate(invocation_target: &InvocationTarget) -> Self {
InvocationId::generate(invocation_target, None)
InvocationId::generate(invocation_target, None, None)
}

pub fn mock_random() -> Self {
Expand Down Expand Up @@ -1132,11 +1148,28 @@ mod tests {
let idempotent_key = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);

assert_eq!(
InvocationId::generate(&invocation_target, Some(&idempotent_key)),
InvocationId::generate(&invocation_target, Some(&idempotent_key))
InvocationId::generate(&invocation_target, None, Some(&idempotent_key)),
InvocationId::generate(&invocation_target, None, Some(&idempotent_key))
);
}

#[test]
fn deterministic_partition_key_for_service_request() {
let invocation_target = InvocationTarget::mock_service();
let concurrency = invocation::Concurrency::Sequential {
inbox_target: invocation_target.service_name().clone(),
inbox_key: Alphanumeric
.sample_string(&mut rand::thread_rng(), 16)
.into(),
};

let id1 = InvocationId::generate(&invocation_target, Some(&concurrency), None);
let id2 = InvocationId::generate(&invocation_target, Some(&concurrency), None);

assert_eq!(id1.partition_key(), id2.partition_key());
assert_ne!(id1.invocation_uuid(), id2.invocation_uuid());
}

#[test]
fn deterministic_invocation_id_for_workflow_request() {
let invocation_target = InvocationTarget::mock_workflow();
Expand Down
100 changes: 94 additions & 6 deletions crates/types/src/invocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,19 +273,95 @@ impl fmt::Display for InvocationTarget {
}
}

/// Concurrency guarantee of the invocation request.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum Concurrency {
/// Invocation executes sequential wrt the inbox address (target + key)
Sequential {
/// fka ServiceId.name
inbox_target: ByteString,
/// fka ServiceId.key
inbox_key: ByteString,
},
/// No queueing, just execute the request
Concurrent,
}

impl Concurrency {
pub fn inbox_key(&self) -> Option<&ByteString> {
if let Concurrency::Sequential { inbox_key, .. } = self {
Some(inbox_key)
} else {
None
}
}

pub fn infer_target_default(invocation_target: &InvocationTarget) -> Concurrency {
match invocation_target {
InvocationTarget::VirtualObject { handler_ty: VirtualObjectHandlerType::Exclusive, name, key, .. } => {
Concurrency::Sequential {
inbox_target: name.clone(),
inbox_key: key.clone(),
}
}
InvocationTarget::Service { .. } |
InvocationTarget::VirtualObject { handler_ty: VirtualObjectHandlerType::Shared, .. } |
/* For workflow, there is no enqueueing as we have the behavior on existing invocation id that guarantees correctness */
InvocationTarget::Workflow { .. } => Concurrency::Concurrent
}
}
}

/// Behavior when sending an invocation request and the invocation already exists.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum IfExists {
/// Attach to the existing invocation
Attach,
/// Reply with "conflict" error response
ReplyConflict,
/// Just drop the request
Drop,
}

impl IfExists {
pub fn infer_target_default(
invocation_target_type: InvocationTargetType,
has_idempotency_key: bool,
) -> IfExists {
match (invocation_target_type, has_idempotency_key) {
(InvocationTargetType::Workflow(WorkflowHandlerType::Workflow), _) => {
IfExists::ReplyConflict
}
(_, true) => IfExists::Attach,
_ => IfExists::Drop,
}
}
}

/// Invocation request flow is as follows:
///
/// 1. Invocation is proposed in the PP log.
/// 2. PP will first check if another invocation with the same id exists. If it exists, it applies the [`IfExists`], otherwise moves to point 3.
/// 3. If the invocation id doesn't exist, wait for the `execution_time` if present, otherwise continue immediately.
/// 4. Apply the given [`Concurrency`].
/// 5. Finally execute it sending the request to the service endpoint.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct InvocationRequestHeader {
pub id: InvocationId,
pub target: InvocationTarget,
pub headers: Vec<Header>,
pub span_context: ServiceInvocationSpanContext,

/// Key to use for idempotent request. If none, this request is not idempotent, or it's a workflow call. See [`InvocationRequestHeader::is_idempotent`].
pub idempotency_key: Option<ByteString>,

/// Behavior to apply on an existing invocation id. If not present, [`IfPresent::infer_target_default`] should be used.
pub if_exists: Option<IfExists>,
/// Time when the request should be executed. If none, it's executed immediately.
pub execution_time: Option<MillisSinceEpoch>,
/// Concurrency behavior to apply. If not present, [`Concurrency::infer_target_default`] should be used.
pub concurrency: Option<Concurrency>,

/// Key to use for idempotent request. If none, this request is not idempotent, or it's a workflow call. See [`InvocationRequestHeader::is_idempotent`].
/// This value is propagated only for observability purposes, as the invocation id is already deterministic given the invocation id.
pub idempotency_key: Option<ByteString>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does that need serde(default)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this field is not new, i just moved it to the bottom here.

/// Retention duration of the completed status. If none, the completed status is not retained.
pub completion_retention_duration: Option<Duration>,
}
Expand All @@ -300,6 +376,8 @@ impl InvocationRequestHeader {
idempotency_key: None,
execution_time: None,
completion_retention_duration: None,
if_exists: None,
concurrency: None,
}
}

Expand Down Expand Up @@ -352,10 +430,13 @@ pub struct ServiceInvocation {
pub source: Source,
pub span_context: ServiceInvocationSpanContext,
pub headers: Vec<Header>,
/// Time when the request should be executed
pub idempotency_key: Option<ByteString>,

pub if_exists: Option<IfExists>,
pub execution_time: Option<MillisSinceEpoch>,
pub concurrency: Option<Concurrency>,

pub completion_retention_duration: Option<Duration>,
pub idempotency_key: Option<ByteString>,

// Where to send the response, if any
pub response_sink: Option<ServiceInvocationResponseSink>,
Expand All @@ -382,10 +463,12 @@ impl ServiceInvocation {
span_context: request.header.span_context,
headers: request.header.headers,
execution_time: request.header.execution_time,
concurrency: request.header.concurrency,
completion_retention_duration: request.header.completion_retention_duration,
idempotency_key: request.header.idempotency_key,
response_sink: None,
submit_notification_sink: None,
if_exists: request.header.if_exists,
}
}

Expand All @@ -403,9 +486,11 @@ impl ServiceInvocation {
span_context: ServiceInvocationSpanContext::empty(),
headers: vec![],
execution_time: None,
concurrency: None,
completion_retention_duration: None,
idempotency_key: None,
submit_notification_sink: None,
if_exists: None,
}
}

Expand Down Expand Up @@ -884,6 +969,7 @@ impl InvocationQuery {
handler_ty: WorkflowHandlerType::Workflow,
},
None,
None,
),
InvocationQuery::IdempotencyId(IdempotencyId {
service_name,
Expand All @@ -904,7 +990,7 @@ impl InvocationQuery {
VirtualObjectHandlerType::Exclusive,
),
};
InvocationId::generate(&target, Some(idempotency_key.deref()))
InvocationId::generate(&target, None, Some(idempotency_key.deref()))
}
}
}
Expand Down Expand Up @@ -995,9 +1081,11 @@ mod mocks {
span_context: Default::default(),
headers: vec![],
execution_time: None,
concurrency: None,
completion_retention_duration: None,
idempotency_key: None,
submit_notification_sink: None,
if_exists: None,
}
}
}
Expand Down
Loading