Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added in a first attempt at a solution to sharing watchers between stores and controllers #1128

Closed
wants to merge 6 commits into from

Conversation

danrspencer
Copy link
Contributor

@danrspencer danrspencer commented Jan 27, 2023

Motivation

We run quite a complex Kubernetes Operator, with lots of stores and controllers. As the project has grown we've shifted the controller logic out into many independent libraries with the eventual goal of having a binary which is just a runner for all of our small unit testable controller libraries.

While this approach has been largely successful we're still grappling with a few concerns that we'd like to address:

  1. We have more than one cache of the same data from Kubernetes in our service (Issue 1080). We attempted to address this by having a god object in the primary service, but it ended up having knowing way too much about the things it was running; and it didn't solve the problem of controllers running on different stores to the stores.

  2. Stores startup is racey against controllers (Discussion 1124). Before you can run a controller you have to ensure that all stores passed to it have received at least one event otherwise you run the risk of reconciling against incorrect (missing) data.

  3. Running everything in a consistent manner. This one is probably less of a general problem and more specific to the way we've implemented things; but with all of these controllers and stores (especially given point 2) running all of these streams gets complicated.

Solution

N.B. The solution is still rough / incomplete in a few areas. I’m creating a draft PR now for feedback if this is something that could be considered valuable for the kube-runtime lib.


The struct currently known as SharedStore (probably not the right name but it's a starting point) wraps a single Api and allows for the creation of stores and controllers from that Api. Here's how it addresses each of the concerns we had above:

  1. Whenever a store is requested it's stored inside the SharedStore and indexed by its ListParams, if the same store (e.g. same ListParams) is requested again (either for a controller or a store) then the existing store is returned. This approach keeps the details of what's being requested close to the call point, with the head of the application only needs to know what types of resources are being managed.

  2. Each SharedStore takes a reference to a ReadyToken to allow communication between SharedStores about their ready status. Each new store requested registers itself against the ReadyToken and marks itself as ready after receiving the first event. When everything is ran (see point 3) the controllers will only begin running once every store has received its first event.

  3. The SharedStore bundles up all of the streams for each store and controller and follows a pattern similar to Controller::new(...).run(...) where it consumes itself and returns a stream. The stream will only poll the controller streams once the ReadyToken has marked itself as ready.

Here's a contrived example of how it could work:

async fn main(client: Client) {
    let ready_token = ReadyToken::new();
    let mut pod_shared_store = SharedStore::new(Api::<Pod>::all(client.clone()), &ready_token);
    let mut cm_shared_store = SharedStore::new(Api::<ConfigMap>::all(client.clone()), &ready_token);
    let mut pvc_shared_store = SharedStore::new(Api::<PersistentVolumeClaim>::all(client), &ready_token);

    setup_pod_controller(&mut pod_shared_store, &mut cm_shared_store);
    setup_pvc_controller(&mut pvc_shared_store, &mut pod_shared_store, &mut cm_shared_store);

    select_all(vec![
        pod_shared_store.run().map(|_| ()).boxed(),
        cm_shared_store.run().map(|_| ()).boxed(),
        pvc_shared_store.run().map(|_| ()).boxed(),
    ])
    .for_each(|_| async {})
    .await;
}

fn setup_pod_controller(
    pod_shared_store: &mut SharedStore<Pod>,
    cm_shared_store: &mut SharedStore<ConfigMap>,
) {
    let cm_store = cm_shared_store.store(ListParams::default().labels(“app=nginx”));

    pod_shared_store.controller(
        ListParams::default().labels(“app=nginx”),
        move |pod, ctx| async move {
            unimplemented!(“do something with the pod and the config map store”);
        },
        |_, _: &Error, _| unimplemented!(“error”),
        Arc::new(cm_store),
    );
}

fn setup_pvc_controller(
    pvc_shared_store: &mut SharedStore<PersistentVolumeClaim>,
    pod_shared_store: &mut SharedStore<Pod>,
    cm_shared_store: &mut SharedStore<ConfigMap>,
) {
    // Will get the same store that’s feeding the controller above
    let pod_store = pod_shared_store.store(ListParams::default().labels(“app=nginx”));
    // Gets a different store to the CM store above because it has different labels
    let cm_store = cm_shared_store.store(ListParams::default().labels(“app=other”));

    pvc_shared_store.controller(
        ListParams::default().labels(“app=nginx”),
        move |pod, ctx| async move {
            unimplemented!(“do something with the pvc and the config map store”);
        },
        |_, _: &Error, _| unimplemented!(“error”),
        Arc::new((pod_store, cm_store)),
    );
}

@clux
Copy link
Member

clux commented Jan 28, 2023

Thanks a lot for taking the initiative on this and getting a reasonable starting PR out. There's definitely some good ideas here, even though it'll take some time to get something as ambitious as this over the line.

I'll put some comments on various bits of the bits here (starting with the easier* stuff) over the next coming days. Will need to think a bit about this as well.

Comment on lines 18 to 22
pub struct Prism<K> {
reflector: BoxStream<'static, watcher::Result<Event<K>>>,
ready_state: ReadyState,
sender: broadcast::Sender<Event<K>>,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll start here. This Prism/SharedStore pattern can work, but it feels a bit inside out.

Ideally, this Prism logic can be a method on WatchStreamExt, rather than a public top level type because then it is compatible with both reflector and watcher. That might actually be pretty close to what you have with WatchStreamExt::subscribe returning a Prism AFAIKT since you impl Stream for it.

For the second problem; blocking until ready; I feel like that can maybe be done in WatchStreamExt as well. We already have WatchStreamExt::backoff there as a way to limit flow, so it should be possible to do something similar to wait until we have seen ONE element (a Once thing inside a struct that impls stream can maybe embed a token) without having the user manage/pass ready tokens around. That way we could conceivably do something like:

let cm_stream = reflector(writer, watcher(...)).once_ready().await;
let cm_stream_cpy = cm_stream.subscribe();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(it would be a minor performance hit to do multiple .once_ready().awaits sequentially, but I think this can be worked around with a join_all before creating subscriptions.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cheers for the suggestions; I'll look at moving some of this stuff around onto the WatchStreamExt. I'm still on a pretty steep learning curve with streams so you'll have to bear with me on lots of boxed streams until I fully get my head wrapped around things properly 😅.

I have a concern with the once_ready suggestion in that it could be easy to forget, or not discovered, until you get bitten by a bug caused by a race condition.

If we're still thinking of something like the SharedStore struct that provides a wrapper over Store creation then that alleviate that concern somewhat. Easy mode would be shared_store.store(lp).await and if more fine gained control is wanted then it's perhaps not unreasonable to expect people to invest a bit more in learning the API.

The token approach does allow the Store creation to remain synchronous rather than async (although I'm not sure how valuable that actually is).

Copy link
Contributor Author

@danrspencer danrspencer Jan 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more thoughts on once_ready (because it's not far from how we currently handle our stores and trying to implement something like it reminded me of some of the issues we had to tackle).

It would probably need to not only be async, but also return be resulty since the stream could end before the await completes successfully. Our implementation also takes a timeout so that the future won't wait indefinitely if the stream never successfully receives an event, which is another type of failures that needs to be handled.

One of my aims with the initial approach was to defer these problems away from setup to the point where you start consuming the streams.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good point about the "missed once_ready" type mistake. Not sure what the best way for that would be. Possible you could do a wrapper type for the stream to encapsulate the "post-init" state, but that's pretty hairy to do on streams (maybe even too hairy to be worth it).

As for the Result problem, you could still collect futures (of once_ready results) early without awaiting them, then do a try_join! on all the streams before subscribing, but I guess that does not plug into timeout easily? We might have to supply a helper that does a raw select! to do this nicely possibly. It sounds brittle 😞

Copy link
Contributor Author

@danrspencer danrspencer Jan 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just going to re-iterate the specific problem we're trying to solve so it's clear in my head.

When you call .state() on a Store you may get a result that hasn't yet being initialised. We're currently trying to solve the init problem; either by not running controllers until all the stores have reported ready, or by supplying a future that can be waited until a store is ready. Once we have declared the store "ready" in some way we know it's safe to use.

Perhaps we're tackling the problem in the wrong place? We could provide a wrapper over store (SafeStore?), or (more radically) change the interface of Store itself so that calling .state() or .get() are async. They won't return a result until the Store has initialised but once it has will always instantly poll ready with the answer.

Copy link
Contributor Author

@danrspencer danrspencer Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've had a think about the approach I started here and realised the I was bundling everything together specifically to solve the race condition problem. If you can solve the race condition in a different way you can unbundle everything.

Given that; I've pared things right back and deleted all of the Controller stuff for now to focus on the duplicate stores and race condition (which are separate concerns which probably belong in their own PRs, keeping it all here for now for historical reasons while it's discussed).

I've added in a struct called SafeStore which exposes async versions of the Store methods. They will block until the Store has received the first event and then always instantly return the value. Instances of SafeStore can be given to controllers with no worry about race conditions.

I've also added a subscribable method in watch_ext which returns a version of the watch stream that exposes a subscribe_ok method. This will allow controllers to hook into the same stream that a store uses (probably with an approach something like you discussed in Issue 1080).

As I mentioned above SharedStore has dropped all notions of handling controllers. It now just provides Stores with the same caching mechanism as before. Additional I've added in subscribe methods to the SharedStore, allowing controllers to pull their streams from them.

To revisit the original example I put above:

fn setup_pvc_controller(
    namespace: &str,
    pvc_shared_store: &mut SharedStore<PersistentVolumeClaim>,
    pod_shared_store: &mut SharedStore<Pod>,
    cm_shared_store: &mut SharedStore<ConfigMap>,
) {
    let pod_store = pod_shared_store.all(ListParams::default().labels(“app=nginx”));
    let cm_store = cm_shared_store.all(ListParams::default().labels(“app=other”));

    // Given that to setup a controller you'll always call both of these method, perhaps there should be a single
    // call that returns both the store and the subscription?
    let pvc_store = pvc_shared_store.namespaced(namespace, ListParams::default().labels(“app=nginx”))
    let pvc_stream = pvc_shared_store.namespaced_subscribe(namespace, ListParams::default().labels(“app=nginx”))

   // Below syntax is entire made up
   Controller::new_from(pvc_store, pvc_stream).run(....)
}

Comment on lines +9 to +19
#[pin_project]
/// todo - docs
#[must_use = "streams do nothing unless polled"]
pub struct StreamSubscribable<S>
where
S: TryStream,
{
#[pin]
stream: S,
sender: broadcast::Sender<S::Ok>,
}
Copy link
Member

@clux clux Feb 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do like the setup of this now. It's fairly orthogonal to the rest of this PR and should have wide usability outside of the more ambitious scope of this PR, so it might be worth cleaning this file up (plus the watch_ext callee) in an own PR so we can reduce the scope here.

There's only tests and docs (incl a doc example) missing. Tests can follow other WatchStreamExt setup and check some reasonable edge cases; more than 1 auxiliary subscriber, and verify that sending an Err propagates the err everywhere, etc.

#[pin_project]
/// todo - docs
#[must_use = "streams do nothing unless polled"]
pub struct StreamSubscribable<S>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call this StreamSubscribe to be more in line with normal, unconjugated naming of https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html

Comment on lines +33 to +36
pub async fn get(&self, key: &ObjectRef<K>) -> Option<Arc<K>> {
self.ready.ready().await;
self.store.get(key)
}
Copy link
Member

@clux clux Feb 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little worried about everything hinging on this future not being ready. This means we can pass around SafeStores that are not actually ready. Given that "being ready" is a one-time thing to wait, it feels to me that this should be done AS we transition INTO the SafeStore. This also would simplify our usage in more complicated bits because we can just assume the invariant than have to account for the possibility of it with async (which is sometimes hard inside combinator style async code).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

effectively what i want is an async TryFrom style conversion (maybe via https://docs.rs/async-convert/latest/async_convert/trait.TryFrom.html - haven't really evaluated this crate)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main thing I'm trying to address is decoupling setup from usage. If the await was on the transition into of SafeStore you still need to start consuming the watcher stream before you could get a SafeStore to pass to a Controller. That would make wiring together lots of stores and controllers much more complicated.

Maybe a better alternative would be to return a Result::Err from the store if not ready; at least that way if you do hit the race condition (or you forget to start consuming your watcher stream) you just get an error StoreNotReady rather than an await that could potentially hang forever.

I'll focus on tidying up the StreamSubscribe for now then circle back around to think more about this (and have a look into the crate you've linked).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants