Skip to content
This repository has been archived by the owner on May 22, 2024. It is now read-only.

Commit

Permalink
Use RwLock instead of Notify (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
kflansburg authored Oct 12, 2021
1 parent 8430e39 commit 047eb1e
Showing 1 changed file with 45 additions and 17 deletions.
62 changes: 45 additions & 17 deletions krator/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;

use futures::{StreamExt, TryStreamExt};
use tokio::sync::mpsc::Sender;
use tokio::sync::Notify;
use tokio::sync::RwLock;
use tracing::{debug, error, info, trace, warn};

use kube::{
Expand Down Expand Up @@ -165,8 +165,8 @@ impl<O: Operator> OperatorRuntime<O> {
) -> anyhow::Result<Sender<ObjectEvent<O::Manifest>>> {
let (sender, mut receiver) = tokio::sync::mpsc::channel::<ObjectEvent<O::Manifest>>(128);

let deleted = Arc::new(Notify::new());
let deleted_event = Arc::new(Notify::new());
let deleted = Arc::new(RwLock::new(false));
let deleted_event = Arc::new(RwLock::new(false));

let object_state = self.operator.initialize_object_state(&manifest).await?;

Expand All @@ -191,7 +191,10 @@ impl<O: Operator> OperatorRuntime<O> {
);
let meta = manifest.meta();
if meta.deletion_timestamp.is_some() {
reflector_deleted.notify_one();
{
let mut event = reflector_deleted.write().await;
*event = true;
}
}
match manifest_tx.send(manifest) {
Ok(()) => (),
Expand All @@ -211,8 +214,14 @@ impl<O: Operator> OperatorRuntime<O> {
?namespace,
"Resource deleted.",
);
reflector_deleted.notify_one();
reflector_deleted_event.notify_one();
{
let mut event = reflector_deleted.write().await;
*event = true;
}
{
let mut event = reflector_deleted_event.write().await;
*event = true;
}
break;
}
}
Expand Down Expand Up @@ -343,13 +352,25 @@ impl<O: Operator> OperatorRuntime<O> {
}
}

async fn wait_event(event: Arc<RwLock<bool>>) {
loop {
{
let event = event.read().await;
if *event {
break;
}
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}

async fn run_object_task<O: Operator>(
client: Client,
manifest: Manifest<O::Manifest>,
shared: SharedState<<O::ObjectState as ObjectState>::SharedState>,
mut object_state: O::ObjectState,
deleted: Arc<Notify>,
deleted_event: Arc<Notify>,
deleted: Arc<RwLock<bool>>,
deleted_event: Arc<RwLock<bool>>,
operator: Arc<O>,
) {
debug!("Running registration hook.");
Expand All @@ -373,7 +394,7 @@ async fn run_object_task<O: Operator>(

tokio::select! {
_ = run_to_completion(&client, state, shared.clone(), &mut object_state, manifest.clone()) => (),
_ = deleted.notified() => {
_ = wait_event(Arc::clone(&deleted)) => {
let state: O::DeletedState = Default::default();
debug!("Object {} in namespace {:?} terminated. Jumping to state {:?}.", name, &namespace, state);
run_to_completion(&client, state, shared.clone(), &mut object_state, manifest.clone()).await;
Expand All @@ -384,7 +405,7 @@ async fn run_object_task<O: Operator>(
"Resource {} in namespace {:?} waiting for deregistration.",
name, namespace
);
deleted.notified().await;
wait_event(Arc::clone(&deleted)).await;
{
let mut state_writer = shared.write().await;
object_state.async_drop(&mut state_writer).await;
Expand All @@ -411,20 +432,27 @@ async fn run_object_task<O: Operator>(
match api_client.delete(&name, &dp).await {
Ok(_) => {
debug!(
"Resource {} in namespace {:?} deregistered.",
name, namespace
?namespace,
%name,
"Object deregistered"
);
}
Err(e) => match e {
// Ignore not found, already deleted. This could happen if resource was force deleted.
kube::error::Error::Api(kube::error::ErrorResponse { code, .. }) if code == 404 => (),
e => {
kube::error::Error::Api(kube::error::ErrorResponse { code, .. }) if code == 404 => {
debug!(?namespace, %name, "Object already deleted")
}
error => {
warn!(
"Unable to deregister resource {} in namespace {:?} with Kubernetes API: {:?}",
name, namespace, e
?namespace,
%name,
?error,
"Unable to deregister object with Kubernetes API"
);
}
},
}
deleted_event.notified().await;

wait_event(deleted_event).await;
debug!(?namespace, %name, "Object deleted");
}

0 comments on commit 047eb1e

Please sign in to comment.