diff --git a/service-protocol/dev/restate/service/protocol.proto b/service-protocol/dev/restate/service/protocol.proto index 405fee0..45aa417 100644 --- a/service-protocol/dev/restate/service/protocol.proto +++ b/service-protocol/dev/restate/service/protocol.proto @@ -318,7 +318,8 @@ message CallEntryMessage { // If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise. string key = 5; - string idempotency_key = 6; + // If present, it must be non empty. + optional string idempotency_key = 6; oneof result { bytes value = 14; @@ -349,7 +350,8 @@ message OneWayCallEntryMessage { // If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise. string key = 6; - string idempotency_key = 7; + // If present, it must be non empty. + optional string idempotency_key = 7; // Entry name string name = 12; diff --git a/src/service_protocol/generated/dev.restate.service.protocol.rs b/src/service_protocol/generated/dev.restate.service.protocol.rs index 43f35bd..5391bc6 100644 --- a/src/service_protocol/generated/dev.restate.service.protocol.rs +++ b/src/service_protocol/generated/dev.restate.service.protocol.rs @@ -361,8 +361,9 @@ pub struct CallEntryMessage { /// If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise. #[prost(string, tag = "5")] pub key: ::prost::alloc::string::String, - #[prost(string, tag = "6")] - pub idempotency_key: ::prost::alloc::string::String, + /// If present, it must be non empty. + #[prost(string, optional, tag = "6")] + pub idempotency_key: ::core::option::Option<::prost::alloc::string::String>, /// Entry name #[prost(string, tag = "12")] pub name: ::prost::alloc::string::String, @@ -401,8 +402,9 @@ pub struct OneWayCallEntryMessage { /// If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise. #[prost(string, tag = "6")] pub key: ::prost::alloc::string::String, - #[prost(string, tag = "7")] - pub idempotency_key: ::prost::alloc::string::String, + /// If present, it must be non empty. + #[prost(string, optional, tag = "7")] + pub idempotency_key: ::core::option::Option<::prost::alloc::string::String>, /// Entry name #[prost(string, tag = "12")] pub name: ::prost::alloc::string::String, diff --git a/src/vm/errors.rs b/src/vm/errors.rs index 6cc45d1..39c6e7d 100644 --- a/src/vm/errors.rs +++ b/src/vm/errors.rs @@ -125,6 +125,11 @@ pub const BAD_COMBINATOR_ENTRY: Error = Error::new_const( "The combinator cannot be replayed. This is most likely caused by non deterministic code.", ); +pub const EMPTY_IDEMPOTENCY_KEY: Error = Error::new_const( + codes::INTERNAL, + "Trying to execute an idempotent request with an empty idempotency key, this is not supported", +); + // Other errors #[derive(Debug, Clone, thiserror::Error)] diff --git a/src/vm/mod.rs b/src/vm/mod.rs index 51df4e0..285a5b4 100644 --- a/src/vm/mod.rs +++ b/src/vm/mod.rs @@ -11,7 +11,9 @@ use crate::service_protocol::messages::{ }; use crate::service_protocol::{Decoder, RawMessage, Version}; use crate::vm::context::{EagerGetState, EagerGetStateKeys}; -use crate::vm::errors::{UnexpectedStateError, UnsupportedFeatureForNegotiatedVersion}; +use crate::vm::errors::{ + UnexpectedStateError, UnsupportedFeatureForNegotiatedVersion, EMPTY_IDEMPOTENCY_KEY, +}; use crate::vm::transitions::*; use crate::{ AsyncResultCombinator, AsyncResultHandle, CancelInvocationTarget, Error, GetInvocationIdTarget, @@ -433,8 +435,15 @@ impl super::VM for CoreVM { ret )] fn sys_call(&mut self, target: Target, input: Bytes) -> VMResult { - if target.idempotency_key.is_some() { - self.verify_feature_support("attach idempotency key to one way call", Version::V3)?; + if let Some(idempotency_key) = &target.idempotency_key { + self.verify_feature_support("attach idempotency key to call", Version::V3)?; + if idempotency_key.is_empty() { + self.do_transition(HitError { + error: EMPTY_IDEMPOTENCY_KEY, + next_retry_delay: None, + })?; + unreachable!(); + } } self.do_transition(SysCompletableEntry( "SysCall", @@ -442,7 +451,7 @@ impl super::VM for CoreVM { service_name: target.service, handler_name: target.handler, key: target.key.unwrap_or_default(), - idempotency_key: target.idempotency_key.unwrap_or_default(), + idempotency_key: target.idempotency_key, parameter: input, ..Default::default() }, @@ -461,8 +470,15 @@ impl super::VM for CoreVM { input: Bytes, delay: Option, ) -> VMResult { - if target.idempotency_key.is_some() { + if let Some(idempotency_key) = &target.idempotency_key { self.verify_feature_support("attach idempotency key to one way call", Version::V3)?; + if idempotency_key.is_empty() { + self.do_transition(HitError { + error: EMPTY_IDEMPOTENCY_KEY, + next_retry_delay: None, + })?; + unreachable!(); + } } self.do_transition(SysNonCompletableEntry( "SysOneWayCall", @@ -470,7 +486,7 @@ impl super::VM for CoreVM { service_name: target.service, handler_name: target.handler, key: target.key.unwrap_or_default(), - idempotency_key: target.idempotency_key.unwrap_or_default(), + idempotency_key: target.idempotency_key, parameter: input, invoke_time: delay .map(|d| {