Skip to content

Commit

Permalink
<rshim>Improve the events report
Browse files Browse the repository at this point in the history
1.add timeout in ctx
2.add queue in publisher report
3.add reconnect client method
  • Loading branch information
jokemanfire authored and mxpv committed Oct 30, 2024
1 parent 792c2e6 commit 6cb625d
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 28 deletions.
9 changes: 6 additions & 3 deletions crates/runc-shim/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ use containerd_shim::{
event::Event,
io_error,
monitor::{Subject, Topic},
protos::{events::task::TaskExit, protobuf::MessageDyn},
protos::{events::task::TaskExit, protobuf::MessageDyn, ttrpc::context::with_timeout},
util::{
convert_to_timestamp, read_options, read_pid_from_file, read_runtime, read_spec, timestamp,
write_str_to_file,
},
Config, Context, DeleteResponse, Error, Flags, StartOpts,
Config, DeleteResponse, Error, Flags, StartOpts,
};
use log::{debug, error, warn};
use tokio::sync::mpsc::{channel, Receiver, Sender};
Expand Down Expand Up @@ -218,8 +218,11 @@ async fn forward(
) {
tokio::spawn(async move {
while let Some((topic, e)) = rx.recv().await {
// While ttrpc push the event,give it a 5 seconds timeout.
// Prevent event reporting from taking too long time.
// Learnd from goshim's containerd/runtime/v2/shim/publisher.go
publisher
.publish(Context::default(), &topic, &ns, e)
.publish(with_timeout(5000000000), &topic, &ns, e)
.await
.unwrap_or_else(|e| warn!("publish {} to containerd: {}", topic, e));
}
Expand Down
120 changes: 95 additions & 25 deletions crates/shim/src/asynchronous/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,108 @@

use std::os::unix::io::RawFd;

use async_trait::async_trait;
use containerd_shim_protos::{
api::Empty,
api::Envelope,
protobuf::MessageDyn,
shim::events,
shim_async::{Client, Events, EventsClient},
shim_async::{Client, EventsClient},
ttrpc,
ttrpc::{context::Context, r#async::TtrpcContext},
ttrpc::context::Context,
};
use log::debug;
use tokio::sync::mpsc;

use crate::{
error::Result,
error::{self, Result},
util::{asyncify, connect, convert_to_any, timestamp},
};

/// The publisher reports events and uses a queue to retry the event reporting.
/// The maximum number of attempts to report is 5 times.
/// When the ttrpc client fails to report, it attempts to reconnect to the client and report.

/// Max queue size
const QUEUE_SIZE: i64 = 1024;
/// Max try five times
const MAX_REQUEUE: i64 = 5;

/// Async Remote publisher connects to containerd's TTRPC endpoint to publish events from shim.
pub struct RemotePublisher {
client: EventsClient,
pub address: String,
sender: mpsc::Sender<Item>,
}

#[derive(Clone, Debug)]
pub struct Item {
ev: Envelope,
ctx: Context,
count: i64,
}

impl RemotePublisher {
/// Connect to containerd's TTRPC endpoint asynchronously.
///
/// containerd uses `/run/containerd/containerd.sock.ttrpc` by default
pub async fn new(address: impl AsRef<str>) -> Result<RemotePublisher> {
let client = Self::connect(address).await?;
let client = Self::connect(address.as_ref()).await?;
// Init the queue channel
let (sender, receiver) = mpsc::channel::<Item>(QUEUE_SIZE as usize);
let rt = RemotePublisher {
address: address.as_ref().to_string(),
sender,
};
rt.process_queue(client, receiver).await;
Ok(rt)
}

Ok(RemotePublisher {
client: EventsClient::new(client),
})
/// Process_queue for push events
///
/// This is a loop task for dealing event tasks
pub async fn process_queue(&self, ttrpc_client: Client, mut receiver: mpsc::Receiver<Item>) {
let mut client = EventsClient::new(ttrpc_client);
let sender = self.sender.clone();
let address = self.address.clone();
tokio::spawn(async move {
// only this use receiver
while let Some(item) = receiver.recv().await {
// drop this event after MAX_REQUEUE try
if item.count > MAX_REQUEUE {
debug!("drop event {:?}", item);
continue;
}
let mut req = events::ForwardRequest::new();
req.set_envelope(item.ev.clone());
let new_item = Item {
ev: item.ev.clone(),
ctx: item.ctx.clone(),
count: item.count + 1,
};
if let Err(e) = client.forward(new_item.ctx.clone(), &req).await {
debug!("publish error {:?}", e);
// This is a bug from ttrpc, ttrpc should return RemoteClosed|ClientClosed error. change it in future
// if e == (ttrpc::error::Error::RemoteClosed || ttrpc::error::Error::ClientClosed)
// reconnect client
let new_client = Self::connect(address.as_str()).await.map_err(|e| {
debug!("reconnect the ttrpc client {:?} fail", e);
});
if let Ok(c) = new_client {
client = EventsClient::new(c);
}
let sender_ref = sender.clone();
// Take a another task requeue , for no blocking the recv task
tokio::spawn(async move {
// wait for few time and send for imporving the success ratio
tokio::time::sleep(tokio::time::Duration::from_secs(new_item.count as u64))
.await;
// if channel is full and send fail ,release it after 3 seconds
let _ = sender_ref
.send_timeout(new_item, tokio::time::Duration::from_secs(3))
.await;
});
}
}
});
debug!("publisher 'process_queue' quit complete");
}

async fn connect(address: impl AsRef<str>) -> Result<Client> {
Expand Down Expand Up @@ -76,37 +148,34 @@ impl RemotePublisher {
envelope.set_timestamp(timestamp()?);
envelope.set_event(convert_to_any(event)?);

let mut req = events::ForwardRequest::new();
req.set_envelope(envelope);
let item = Item {
ev: envelope.clone(),
ctx: ctx.clone(),
count: 0,
};

self.client.forward(ctx, &req).await?;
//if channel is full and send fail ,release it after 3 seconds
self.sender
.send_timeout(item, tokio::time::Duration::from_secs(3))
.await
.map_err(|e| error::Error::Ttrpc(ttrpc::error::Error::Others(e.to_string())))?;

Ok(())
}
}

#[async_trait]
impl Events for RemotePublisher {
async fn forward(
&self,
_ctx: &TtrpcContext,
req: events::ForwardRequest,
) -> ttrpc::Result<Empty> {
self.client.forward(Context::default(), &req).await
}
}

#[cfg(test)]
mod tests {
use std::{
os::unix::{io::AsRawFd, net::UnixListener},
sync::Arc,
};

use async_trait::async_trait;
use containerd_shim_protos::{
api::{Empty, ForwardRequest},
events::task::TaskOOM,
shim_async::create_events,
shim_async::{create_events, Events},
ttrpc::asynchronous::Server,
};
use tokio::sync::{
Expand All @@ -115,6 +184,7 @@ mod tests {
};

use super::*;
use crate::publisher::ttrpc::r#async::TtrpcContext;

struct FakeServer {
tx: Sender<i32>,
Expand Down

0 comments on commit 6cb625d

Please sign in to comment.