From 4f633eefdb909851e8e251fcdd3e4a8d81772bb0 Mon Sep 17 00:00:00 2001 From: Eirik A Date: Mon, 3 Apr 2023 13:22:59 +0100 Subject: [PATCH] Make `VersionMatch` follow upstream + configure list semantics in `watcher::Config` (#1171) * Make `VersionMatch` follow upstream + decouple enum from `watcher` Effectively makes the `VersionMatch` an exact duplicate of the upstream, and adds some builders here and there to compensate for the missing `Any`. Have changed the watcher interface and made a new enum that maps onto `VersionMatch` instead, because it does not make sense to run a watcher against a pinned resource version; it will always use 0. (at least unless/until we build pagination into it). We could keep the enum that wraps resource version, but this is awkward; it would be the only one that bundles resourceVersion that way; - watch api calls get it as a str (outside watchparams) - get api calls (in a follow-up pr) will also not use the enum so altogether we are just making it awkward for ourselves by trying to make it nice :( Signed-off-by: clux * better enum variant names in kube-runtime + less aggressive docs Signed-off-by: clux * rename enum variant, clean up tests and helper methods Signed-off-by: clux * use verbs to be more consistent Signed-off-by: clux * Apply suggestions from code review Co-authored-by: Natalie Signed-off-by: Eirik A * use match rather than if/else on enum Signed-off-by: clux * fix tests Signed-off-by: clux * remove unset variant, replace with Option Signed-off-by: clux * fix deny Signed-off-by: clux * also fix kube-runtime tests Signed-off-by: clux * ws Signed-off-by: clux * use kubernetes terminology for enum and avoid changing default behaviour Signed-off-by: clux * leftover any Signed-off-by: clux * put caveats on MostRecent Signed-off-by: clux --------- Signed-off-by: clux Signed-off-by: Eirik A Co-authored-by: Natalie --- deny.toml | 4 ++ kube-core/src/params.rs | 109 ++++++++++++++++++++++------------- kube-core/src/request.rs | 111 ++++++++++++++---------------------- kube-runtime/src/watcher.rs | 72 ++++++++++++++++------- 4 files changed, 167 insertions(+), 129 deletions(-) diff --git a/deny.toml b/deny.toml index 8a5e24922..01689d056 100644 --- a/deny.toml +++ b/deny.toml @@ -99,3 +99,7 @@ name = "syn" # waiting for pem to bump base64 # https://github.com/jcreekmore/pem-rs/blob/master/Cargo.toml#L16 name = "base64" + +[[bans.skip]] +# deep in dependency tree, only dual use via dev dependency +name = "redox_syscall" diff --git a/kube-core/src/params.rs b/kube-core/src/params.rs index 3aaab39bb..cae1b14fa 100644 --- a/kube-core/src/params.rs +++ b/kube-core/src/params.rs @@ -1,35 +1,38 @@ //! A port of request parameter *Optionals from apimachinery/types.go -use std::fmt; - use crate::request::Error; use serde::Serialize; -/// Controls how the resourceVersion parameter is applied +/// Controls how the resource version parameter is applied for list calls +/// +/// Not specifying a `VersionMatch` strategy will give you different semantics +/// depending on what `resource_version`, `limit`, `continue_token` you include with the list request. /// -/// This embeds the resource version when using the `NotOlderThan` or `Exact` variants. /// See for details. -#[derive(Clone, Debug, Default, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub enum VersionMatch { - /// Matches data with the latest version available in the kube-apiserver database (etcd) (quorum read required). - #[default] - MostRecent, - /// Matches data with the latest version available in the kube-apiserver cache. - Any, - /// Matches data at least as new as the provided resourceVersion. - NotOlderThan(String), - /// Matches data at the exact resourceVersion provided. - Exact(String), -} + /// Returns data at least as new as the provided resource version. + /// + /// The newest available data is preferred, but any data not older than the provided resource version may be served. + /// This guarantees that the collection's resource version is not older than the requested resource version, + /// but does not make any guarantee about the resource version of any of the items in that collection. + /// + /// ### Any Version + /// A degenerate, but common sub-case of `NotOlderThan` is when used together with `resource_version` "0". + /// + /// It is possible for a "0" resource version request to return data at a much older resource version + /// than the client has previously observed, particularly in HA configurations, due to partitions or stale caches. + /// Clients that cannot tolerate this should not use this semantic. + NotOlderThan, -impl fmt::Display for VersionMatch { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - VersionMatch::MostRecent => write!(f, "MostRecent"), - VersionMatch::Any => write!(f, "Any"), - VersionMatch::NotOlderThan(s) => write!(f, "NotOlderThan[{}]", s), - VersionMatch::Exact(s) => write!(f, "Exact[{}]", s), - } - } + /// Return data at the exact resource version provided. + /// + /// If the provided resource version is unavailable, the server responds with HTTP 410 "Gone". + /// For list requests to servers that honor the resource version Match parameter, this guarantees that the collection's + /// resource version is the same as the resource version you requested in the query string. + /// That guarantee does not apply to the resource version of any items within that collection. + /// + /// Note that `Exact` cannot be used with resource version "0". For the most up-to-date list; use `Unset`. + Exact, } /// Common query parameters used in list/delete calls on collections @@ -62,23 +65,27 @@ pub struct ListParams { /// After listing results with a limit, a continue token can be used to fetch another page of results. pub continue_token: Option, - /// Determines how resourceVersion is applied to list calls. - /// See for - /// details. - pub version_match: VersionMatch, + /// Determines how resourceVersion is matched applied to list calls. + pub version_match: Option, + + /// An explicit resourceVersion using the given `VersionMatch` strategy + /// + /// See for details. + pub resource_version: Option, } impl ListParams { pub(crate) fn validate(&self) -> Result<(), Error> { - match &self.version_match { - VersionMatch::Exact(resource_version) | VersionMatch::NotOlderThan(resource_version) => { - if resource_version == "0" { - return Err(Error::Validation( - "ListParams::version_match cannot be equal to \"0\" for Exact and NotOlderThan variants.".into(), - )); - } + if let Some(rv) = &self.resource_version { + if self.version_match == Some(VersionMatch::Exact) && rv == "0" { + return Err(Error::Validation( + "A non-zero resource_version is required when using an Exact match".into(), + )); } - _ => (), + } else if self.version_match.is_some() { + return Err(Error::Validation( + "A resource_version is required when using an explicit match".into(), + )); } Ok(()) } @@ -90,6 +97,7 @@ impl ListParams { /// ``` /// use kube::api::ListParams; /// let lp = ListParams::default() +/// .match_any() /// .timeout(60) /// .labels("kubernetes.io/lifecycle=spot"); /// ``` @@ -139,12 +147,35 @@ impl ListParams { self } - /// Sets resource version and resource version match. + /// Sets the resource version #[must_use] - pub fn version_match(mut self, version_match: VersionMatch) -> Self { - self.version_match = version_match; + pub fn at(mut self, resource_version: &str) -> Self { + self.resource_version = Some(resource_version.into()); self } + + /// Sets an arbitary resource version match strategy + /// + /// A non-default strategy such as `VersionMatch::Exact` or `VersionMatch::NotGreaterThan` + /// requires an explicit `resource_version` set to pass request validation. + #[must_use] + pub fn matching(mut self, version_match: VersionMatch) -> Self { + self.version_match = Some(version_match); + self + } + + /// Use the semantic "any" resource version strategy + /// + /// This is a less taxing variant of the default list, returning data at any resource version. + /// It will prefer the newest avialable resource version, but strong consistency is not required; + /// data at any resource version may be served. + /// It is possible for the request to return data at a much older resource version than the client + /// has previously observed, particularly in high availability configurations, due to partitions or stale caches. + /// Clients that cannot tolerate this should not use this semantic. + #[must_use] + pub fn match_any(self) -> Self { + self.matching(VersionMatch::NotOlderThan).at("0") + } } /// The validation directive to use for `fieldValidation` when using server-side apply. diff --git a/kube-core/src/request.rs b/kube-core/src/request.rs index 003d4ae55..bd5c3a3b3 100644 --- a/kube-core/src/request.rs +++ b/kube-core/src/request.rs @@ -72,18 +72,15 @@ impl Request { qp.append_pair("continue", continue_token); } + if let Some(rv) = &lp.resource_version { + qp.append_pair("resourceVersion", rv.as_str()); + } match &lp.version_match { - VersionMatch::MostRecent => (), - VersionMatch::Any => { - qp.append_pair("resourceVersion", "0"); - qp.append_pair("resourceVersionMatch", "NotOlderThan"); - } - VersionMatch::NotOlderThan(resource_version) => { - qp.append_pair("resourceVersion", resource_version.as_str()); + None => {} + Some(VersionMatch::NotOlderThan) => { qp.append_pair("resourceVersionMatch", "NotOlderThan"); } - VersionMatch::Exact(resource_version) => { - qp.append_pair("resourceVersion", resource_version.as_str()); + Some(VersionMatch::Exact) => { qp.append_pair("resourceVersionMatch", "Exact"); } } @@ -319,18 +316,15 @@ impl Request { qp.append_pair("continue", continue_token); } + if let Some(rv) = &lp.resource_version { + qp.append_pair("resourceVersion", rv.as_str()); + } match &lp.version_match { - VersionMatch::MostRecent => (), - VersionMatch::Any => { - qp.append_pair("resourceVersion", "0"); + None => {} + Some(VersionMatch::NotOlderThan) => { qp.append_pair("resourceVersionMatch", "NotOlderThan"); } - VersionMatch::NotOlderThan(resource_version) => { - qp.append_pair("resourceVersion", resource_version.as_str()); - qp.append_pair("resourceVersionMatch", "NotOlderThan"); - } - VersionMatch::Exact(resource_version) => { - qp.append_pair("resourceVersion", resource_version.as_str()); + Some(VersionMatch::Exact) => { qp.append_pair("resourceVersionMatch", "Exact"); } } @@ -402,6 +396,7 @@ mod test { request::Request, resource::Resource, }; + use http::header; use k8s::{ admissionregistration::v1 as adregv1, apps::v1 as appsv1, authorization::v1 as authv1, autoscaling::v1 as autoscalingv1, batch::v1 as batchv1, core::v1 as corev1, @@ -418,10 +413,7 @@ mod test { let url = corev1::Secret::url_path(&(), Some("ns")); let req = Request::new(url).create(&PostParams::default(), vec![]).unwrap(); assert_eq!(req.uri(), "/api/v1/namespaces/ns/secrets?"); - assert_eq!( - req.headers().get(http::header::CONTENT_TYPE).unwrap(), - super::JSON_MIME - ); + assert_eq!(req.headers().get(header::CONTENT_TYPE).unwrap(), super::JSON_MIME); } #[test] @@ -521,34 +513,31 @@ mod test { let req = Request::new(url).get_metadata("mydeploy").unwrap(); assert_eq!(req.uri(), "/apis/apps/v1/namespaces/ns/deployments/mydeploy"); assert_eq!(req.method(), "GET"); + assert_eq!(req.headers().get(header::CONTENT_TYPE).unwrap(), super::JSON_MIME); assert_eq!( - req.headers().get(http::header::CONTENT_TYPE).unwrap(), - super::JSON_MIME - ); - assert_eq!( - req.headers().get(http::header::ACCEPT).unwrap(), + req.headers().get(header::ACCEPT).unwrap(), super::JSON_METADATA_MIME ); } #[test] fn list_path() { let url = appsv1::Deployment::url_path(&(), Some("ns")); - let gp = ListParams::default(); - let req = Request::new(url).list(&gp).unwrap(); + let lp = ListParams::default(); + let req = Request::new(url).list(&lp).unwrap(); assert_eq!(req.uri(), "/apis/apps/v1/namespaces/ns/deployments"); } #[test] fn list_metadata_path() { let url = appsv1::Deployment::url_path(&(), Some("ns")); - let gp = ListParams::default(); - let req = Request::new(url).list_metadata(&gp).unwrap(); - assert_eq!(req.uri(), "/apis/apps/v1/namespaces/ns/deployments"); + let lp = ListParams::default().matching(VersionMatch::NotOlderThan).at("5"); + let req = Request::new(url).list_metadata(&lp).unwrap(); assert_eq!( - req.headers().get(http::header::CONTENT_TYPE).unwrap(), - super::JSON_MIME + req.uri(), + "/apis/apps/v1/namespaces/ns/deployments?&resourceVersion=5&resourceVersionMatch=NotOlderThan" ); + assert_eq!(req.headers().get(header::CONTENT_TYPE).unwrap(), super::JSON_MIME); assert_eq!( - req.headers().get(http::header::ACCEPT).unwrap(), + req.headers().get(header::ACCEPT).unwrap(), super::JSON_METADATA_LIST_MIME ); } @@ -571,12 +560,9 @@ mod test { req.uri(), "/api/v1/namespaces/ns/pods?&watch=true&resourceVersion=0&timeoutSeconds=290" ); + assert_eq!(req.headers().get(header::CONTENT_TYPE).unwrap(), super::JSON_MIME); assert_eq!( - req.headers().get(http::header::CONTENT_TYPE).unwrap(), - super::JSON_MIME - ); - assert_eq!( - req.headers().get(http::header::ACCEPT).unwrap(), + req.headers().get(header::ACCEPT).unwrap(), super::JSON_METADATA_MIME ); } @@ -589,10 +575,7 @@ mod test { }; let req = Request::new(url).replace("myds", &pp, vec![]).unwrap(); assert_eq!(req.uri(), "/apis/apps/v1/daemonsets/myds?&dryRun=All"); - assert_eq!( - req.headers().get(http::header::CONTENT_TYPE).unwrap(), - super::JSON_MIME - ); + assert_eq!(req.headers().get(header::CONTENT_TYPE).unwrap(), super::JSON_MIME); } #[test] @@ -602,10 +585,7 @@ mod test { let req = Request::new(url).delete("myrs", &dp).unwrap(); assert_eq!(req.uri(), "/apis/apps/v1/namespaces/ns/replicasets/myrs"); assert_eq!(req.method(), "DELETE"); - assert_eq!( - req.headers().get(http::header::CONTENT_TYPE).unwrap(), - super::JSON_MIME - ); + assert_eq!(req.headers().get(header::CONTENT_TYPE).unwrap(), super::JSON_MIME); } #[test] @@ -619,10 +599,7 @@ mod test { "/apis/apps/v1/namespaces/ns/replicasets?&labelSelector=app%3Dmyapp" ); assert_eq!(req.method(), "DELETE"); - assert_eq!( - req.headers().get(http::header::CONTENT_TYPE).unwrap(), - super::JSON_MIME - ); + assert_eq!(req.headers().get(header::CONTENT_TYPE).unwrap(), super::JSON_MIME); } #[test] @@ -657,11 +634,11 @@ mod test { .unwrap(); assert_eq!(req.uri(), "/api/v1/namespaces/ns/pods/mypod?"); assert_eq!( - req.headers().get(http::header::CONTENT_TYPE).unwrap(), + req.headers().get(header::CONTENT_TYPE).unwrap(), Patch::Merge(()).content_type() ); assert_eq!( - req.headers().get(http::header::ACCEPT).unwrap(), + req.headers().get(header::ACCEPT).unwrap(), super::JSON_METADATA_MIME ); assert_eq!(req.method(), "PATCH"); @@ -675,10 +652,7 @@ mod test { .unwrap(); assert_eq!(req.uri(), "/api/v1/nodes/mynode/status?"); assert_eq!(req.method(), "PUT"); - assert_eq!( - req.headers().get(http::header::CONTENT_TYPE).unwrap(), - super::JSON_MIME - ); + assert_eq!(req.headers().get(header::CONTENT_TYPE).unwrap(), super::JSON_MIME); } #[test] @@ -761,7 +735,7 @@ mod test { #[test] fn list_pods_from_cache() { let url = corev1::Pod::url_path(&(), Some("ns")); - let gp = ListParams::default().version_match(VersionMatch::Any); + let gp = ListParams::default().match_any(); let req = Request::new(url).list(&gp).unwrap(); assert_eq!( req.uri().query().unwrap(), @@ -772,7 +746,7 @@ mod test { #[test] fn list_most_recent_pods() { let url = corev1::Pod::url_path(&(), Some("ns")); - let gp = ListParams::default().version_match(VersionMatch::MostRecent); + let gp = ListParams::default(); let req = Request::new(url).list(&gp).unwrap(); assert_eq!( req.uri().query().unwrap(), @@ -783,17 +757,18 @@ mod test { #[test] fn list_invalid_resource_version_combination() { let url = corev1::Pod::url_path(&(), Some("ns")); - let gp = ListParams::default().version_match(VersionMatch::NotOlderThan("0".to_string())); + let gp = ListParams::default().at("0").matching(VersionMatch::Exact); let err = Request::new(url).list(&gp).unwrap_err(); - assert!(format!("{err}") - .contains("version_match cannot be equal to \"0\" for Exact and NotOlderThan variants")); + assert!(format!("{err}").contains("non-zero resource_version is required when using an Exact match")); } #[test] fn list_not_older() { let url = corev1::Pod::url_path(&(), Some("ns")); - let gp = ListParams::default().version_match(VersionMatch::NotOlderThan("20".to_string())); + let gp = ListParams::default() + .at("20") + .matching(VersionMatch::NotOlderThan); let req = Request::new(url).list(&gp).unwrap(); assert_eq!( req.uri().query().unwrap(), @@ -804,12 +779,10 @@ mod test { #[test] fn list_exact_match() { let url = corev1::Pod::url_path(&(), Some("ns")); - let gp = ListParams::default().version_match(VersionMatch::Exact("500".to_string())); + let gp = ListParams::default().at("500").matching(VersionMatch::Exact); let req = Request::new(url).list(&gp).unwrap(); - assert_eq!( - req.uri().query().unwrap(), - "&resourceVersion=500&resourceVersionMatch=Exact" - ); + let query = req.uri().query().unwrap(); + assert_eq!(query, "&resourceVersion=500&resourceVersionMatch=Exact"); } #[test] diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 6f81ef070..fe62202cd 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -154,6 +154,32 @@ struct FullObject<'a, K> { api: &'a Api, } +/// Configurable list semantics for `watcher` relists +#[derive(Clone, Default, Debug, PartialEq)] +pub enum ListSemantic { + /// List calls perform a full quorum read for most recent results + /// + /// Prefer this if you have strong consistency requirements. Note that this + /// is more taxing for the apiserver and can be less scalable for the cluster. + /// + /// If you are observing large resource sets (such as congested `Controller` cases), + /// you typically have a delay between the list call completing, and all the events + /// getting processed. In such cases, it is probably worth picking `Any` over `MostRecent`, + /// as your events are not guaranteed to be up-to-date by the time you get to them anyway. + #[default] + MostRecent, + + /// List calls returns cached results from apiserver + /// + /// This is faster and much less taxing on the apiserver, but can result + /// in much older results than has previously observed for `Restarted` events, + /// particularly in HA configurations, due to partitions or stale caches. + /// + /// This option makes the most sense for controller usage where events have + /// some delay between being seen by the runtime, and it being sent to the reconciler. + Any, +} + /// Accumulates all options that can be used on the watcher invocation. pub struct Config { /// A selector to restrict the list of returned objects by their labels. @@ -173,34 +199,26 @@ pub struct Config { /// We limit this to 295s due to [inherent watch limitations](https://github.com/kubernetes/kubernetes/issues/6513). pub timeout: Option, - /// Determines how resourceVersion is applied to list calls. - /// See for - /// details. - pub version_match: VersionMatch, + /// Semantics for list calls. + /// + /// Configures re-list for performance vs. consistency. + pub list_semantic: ListSemantic, /// Enables watch events with type "BOOKMARK". /// - /// Servers that do not implement bookmarks ignore this flag and - /// bookmarks are sent at the server's discretion. Clients should not - /// assume bookmarks are returned at any specific interval, nor may they - /// assume the server will send any BOOKMARK event during a session. - /// If this is not a watch, this field is ignored. - /// If the feature gate WatchBookmarks is not enabled in apiserver, - /// this field is ignored. + /// Requests watch bookmarks from the apiserver when enabled for improved watch precision and reduced list calls. + /// This is default enabled and should generally not be turned off. pub bookmarks: bool, } impl Default for Config { - /// Default `WatchParams` without any constricting selectors fn default() -> Self { Self { - // bookmarks stable since 1.17, and backwards compatible bookmarks: true, - label_selector: None, field_selector: None, timeout: None, - version_match: VersionMatch::default(), + list_semantic: ListSemantic::default(), } } } @@ -246,13 +264,19 @@ impl Config { self } - /// Sets resource version and resource version match. + /// Sets list semantic to configure re-list performance and consistency #[must_use] - pub fn version_match(mut self, version_match: VersionMatch) -> Self { - self.version_match = version_match; + pub fn list_semantic(mut self, semantic: ListSemantic) -> Self { + self.list_semantic = semantic; self } + /// Sets list semantic to `Any` to improve list performance + #[must_use] + pub fn any_semantic(self) -> Self { + self.list_semantic(ListSemantic::Any) + } + /// Disables watch bookmarks to simplify watch handling /// /// This is not recommended to use with production watchers as it can cause desyncs. @@ -265,11 +289,17 @@ impl Config { /// Converts generic `watcher::Config` structure to the instance of `ListParams` used for list requests. fn to_list_params(&self) -> ListParams { + let version_match = match self.list_semantic { + ListSemantic::Any => Some(VersionMatch::NotOlderThan), + ListSemantic::MostRecent => None, + }; ListParams { label_selector: self.label_selector.clone(), field_selector: self.field_selector.clone(), timeout: self.timeout, - version_match: self.version_match.clone(), + version_match, + /// we always do a full re-list when getting desynced + resource_version: Some("0".into()), // It is not permissible for users to configure the continue token and limit for the watcher, as these parameters are associated with paging. // The watcher must handle paging internally. limit: None, @@ -444,7 +474,7 @@ where /// Trampoline helper for `step_trampolined` async fn step( api: &A, - watcher_config: &Config, + config: &Config, mut state: State, ) -> (Result>, State) where @@ -452,7 +482,7 @@ where A::Value: Resource + 'static, { loop { - match step_trampolined(api, watcher_config, state).await { + match step_trampolined(api, config, state).await { (Some(result), new_state) => return (result, new_state), (None, new_state) => state = new_state, }