Skip to content

Commit

Permalink
Merge pull request #4399 from systeminit/nf/pinga-jetstream
Browse files Browse the repository at this point in the history
feat(pinga): consume work from a Jetstream work queue
  • Loading branch information
nickgerace authored Aug 22, 2024
2 parents 4da2aca + a580e8d commit bd62702
Show file tree
Hide file tree
Showing 22 changed files with 610 additions and 615 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ members = [
"lib/nats-subscriber",
"lib/naxum",
"lib/object-tree",
"lib/pinga-core",
"lib/pinga-server",
"lib/rebaser-core",
"lib/rebaser-server",
Expand Down
64 changes: 28 additions & 36 deletions bin/pinga/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,19 @@
use color_eyre::Result;
use std::time::Duration;

use pinga_server::{Config, Server};
use si_service::startup;
use telemetry_application::prelude::*;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use si_service::{color_eyre, prelude::*, rt, shutdown, startup, telemetry_application};

mod args;

const RT_DEFAULT_THREAD_STACK_SIZE: usize = 2 * 1024 * 1024 * 3;
const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60 * 10);

fn main() -> Result<()> {
let thread_builder = ::std::thread::Builder::new().stack_size(RT_DEFAULT_THREAD_STACK_SIZE);
let thread_handler = thread_builder.spawn(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_stack_size(RT_DEFAULT_THREAD_STACK_SIZE)
.thread_name("bin/pinga-tokio::runtime")
.enable_all()
.build()?
.block_on(async_main())
})?;
thread_handler.join().unwrap()
rt::block_on("bin/pinga-tokio::runtime", async_main())
}

async fn async_main() -> Result<()> {
let shutdown_token = CancellationToken::new();
let task_tracker = TaskTracker::new();
let tracker = TaskTracker::new();
let token = CancellationToken::new();

color_eyre::install()?;
let args = args::parse();
Expand All @@ -40,10 +30,16 @@ async fn async_main() -> Result<()> {
.service_namespace("si")
.log_env_var_prefix("SI")
.app_modules(vec!["pinga", "pinga_server"])
.interesting_modules(vec!["dal", "si_data_nats", "si_data_pg", "si_layer_cache"])
.interesting_modules(vec![
"dal",
"naxum",
"si_data_nats",
"si_data_pg",
"si_layer_cache",
])
.build()?;

telemetry_application::init(config, &task_tracker, shutdown_token.clone())?
telemetry_application::init(config, &tracker, token.clone())?
};

startup::startup("pinga").await?;
Expand All @@ -57,23 +53,19 @@ async fn async_main() -> Result<()> {

let config = Config::try_from(args)?;

task_tracker.close();

Server::from_config(config, shutdown_token.clone(), task_tracker.clone())
.await?
.run()
.await?;
let server = Server::from_config(config, token.clone(), tracker.clone()).await?;

// TODO(fnichol): this will eventually go into the signal handler code but at the moment in
// sdf's case, this is embedded in server library code which is incorrect. At this moment in
// the program however, axum has shut down so it's an appropriate time to cancel other
// remaining tasks and wait on their graceful shutdowns
{
shutdown_token.cancel();
task_tracker.wait().await;
telemetry_shutdown.wait().await?;
}
tracker.spawn(async move {
info!("ready to receive messages");
server.run().await
});

info!("graceful shutdown complete.");
Ok(())
shutdown::graceful(
tracker,
token,
Some(telemetry_shutdown.into_future()),
Some(GRACEFUL_SHUTDOWN_TIMEOUT),
)
.await
.map_err(Into::into)
}
15 changes: 7 additions & 8 deletions lib/dal-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,10 @@ pub async fn jwt_private_signing_key() -> Result<RS256KeyPair> {

/// Configures and builds a [`pinga_server::Server`] suitable for running alongside DAL
/// object-related tests.
pub fn pinga_server(services_context: ServicesContext) -> Result<pinga_server::Server> {
pub async fn pinga_server(
services_context: ServicesContext,
shutdown_token: CancellationToken,
) -> Result<pinga_server::Server> {
let config: pinga_server::Config = {
let mut config_file = pinga_server::ConfigFile::default();
pinga_server::detect_and_configure_development(&mut config_file)
Expand All @@ -590,7 +593,9 @@ pub fn pinga_server(services_context: ServicesContext) -> Result<pinga_server::S
config.instance_id(),
config.concurrency(),
services_context,
shutdown_token,
)
.await
.wrap_err("failed to create Pinga server")?;

Ok(server)
Expand Down Expand Up @@ -722,8 +727,7 @@ async fn global_setup(test_context_builer: TestContextBuilder) -> Result<()> {
let srv_services_ctx = test_context
.create_services_context(token.clone(), tracker.clone())
.await;
let pinga_server = pinga_server(srv_services_ctx)?;
let pinga_server_handle = pinga_server.shutdown_handle();
let pinga_server = pinga_server(srv_services_ctx, token.clone()).await?;
tracker.spawn(pinga_server.run());

// Start up a Rebaser server for migrations
Expand Down Expand Up @@ -765,11 +769,6 @@ async fn global_setup(test_context_builer: TestContextBuilder) -> Result<()> {
.await
.wrap_err("failed to run builtin migrations")?;

// Shutdown the Pinga server (each test gets their own server instance with an exclusively
// unique subject prefix)
info!("shutting down initial migrations Pinga server");
pinga_server_handle.shutdown().await;

// Shutdown the Veritech server (each test gets their own server instance with an exclusively
// unique subject prefix)
info!("shutting down initial migrations Veritech server");
Expand Down
1 change: 1 addition & 0 deletions lib/dal/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ rust_library(
deps = [
"//lib/nats-subscriber:nats-subscriber",
"//lib/object-tree:object-tree",
"//lib/pinga-core:pinga-core",
"//lib/si-crypto:si-crypto",
"//lib/si-data-nats:si-data-nats",
"//lib/si-data-pg:si-data-pg",
Expand Down
1 change: 1 addition & 0 deletions lib/dal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ object-tree = { path = "../../lib/object-tree" }
once_cell = { workspace = true }
paste = { workspace = true }
petgraph = { workspace = true }
pinga-core = { path = "../../lib/pinga-core" }
postcard = { workspace = true }
postgres-types = { workspace = true }
rand = { workspace = true }
Expand Down
5 changes: 5 additions & 0 deletions lib/dal/src/job/processor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use dyn_clone::DynClone;
use si_data_nats::async_nats;
use thiserror::Error;

use crate::{
Expand All @@ -17,6 +18,10 @@ pub enum JobQueueProcessorError {
BlockingJob(#[from] BlockingJobError),
#[error(transparent)]
JobProducer(#[from] JobProducerError),
#[error("stream create error: {0}")]
JsCreateStreamError(#[from] async_nats::jetstream::context::CreateStreamError),
#[error("missing required workspace_pk")]
MissingWorkspacePk,
#[error(transparent)]
Serde(#[from] serde_json::Error),
#[error(transparent)]
Expand Down
92 changes: 66 additions & 26 deletions lib/dal/src/job/processor/nats_processor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_trait::async_trait;
use futures::StreamExt;
use si_data_nats::{NatsClient, Subject};
use pinga_core::{pinga_work_queue, subject::pinga_job, REPLY_INBOX_HEADER_NAME};
use si_data_nats::{jetstream, NatsClient, Subject};
use telemetry::prelude::*;
use telemetry_nats::propagation;
use tokio::task::JoinSet;
Expand All @@ -13,25 +14,23 @@ use crate::job::{

use super::{JobQueueProcessor, JobQueueProcessorError, JobQueueProcessorResult};

const NATS_JOB_QUEUE: &str = "pinga-jobs";

#[derive(Clone, Debug)]
pub struct NatsProcessor {
client: NatsClient,
pinga_subject: Subject,
context: jetstream::Context,
prefix: Option<String>,
}

impl NatsProcessor {
pub fn new(client: NatsClient) -> Self {
let pinga_subject = if let Some(prefix) = client.metadata().subject_prefix() {
format!("{prefix}.{NATS_JOB_QUEUE}").into()
} else {
NATS_JOB_QUEUE.into()
};
// Take the *active* subject prefix from the connected NATS client
let prefix = client.metadata().subject_prefix().map(|s| s.to_owned());
let context = jetstream::new(client.clone());

Self {
client,
pinga_subject,
context,
prefix,
}
}

Expand All @@ -42,23 +41,39 @@ impl NatsProcessor {
fields()
)]
async fn push_all_jobs(&self, queue: JobQueue) -> JobQueueProcessorResult<()> {
// Ensure the Jetstream `Stream` is created before publishing to it
let _stream = pinga_work_queue(&self.context, self.prefix.as_deref()).await?;

let headers = propagation::empty_injected_headers();

while let Some(element) = queue.fetch_job().await {
let job_info = JobInfo::new(element)?;

if let Err(err) = self
.client
let workspace_pk = job_info
.access_builder
.tenancy()
.workspace_pk()
.ok_or(JobQueueProcessorError::MissingWorkspacePk)?;

let subject = pinga_job(
self.prefix.as_deref(),
&String::from(workspace_pk),
&String::from(job_info.visibility.change_set_id),
&job_info.kind,
);

self.context
.publish_with_headers(
self.pinga_subject.clone(),
subject,
headers.clone(),
serde_json::to_vec(&job_info)?.into(),
)
.await
{
error!("Nats job push failed, some jobs will be dropped");
return Err(JobQueueProcessorError::Transport(Box::new(err)));
}
// If `Err` then message failed to publish
.map_err(|err| JobQueueProcessorError::Transport(Box::new(err)))?
.await
// If `Err` then NATS server failed to ack
.map_err(|err| JobQueueProcessorError::Transport(Box::new(err)))?;
}
Ok(())
}
Expand All @@ -67,29 +82,54 @@ impl NatsProcessor {
#[async_trait]
impl JobQueueProcessor for NatsProcessor {
async fn block_on_job(&self, job: Box<dyn JobProducer + Send + Sync>) -> BlockingJobResult {
let mut job_info = JobInfo::new_blocking(job)
// Ensure the Jetstream `Stream` is created before publishing to it
let _stream = pinga_work_queue(&self.context, self.prefix.as_deref())
.await
.map_err(|err| BlockingJobError::JsCreateStreamError(err.to_string()))?;

let job_info = JobInfo::new_blocking(job)
.map_err(|e: JobProducerError| BlockingJobError::JobProducer(e.to_string()))?;

job_info.blocking = true;
let reply_inbox = Subject::from(self.client.new_inbox());

let mut headers = propagation::empty_injected_headers();
headers.insert(REPLY_INBOX_HEADER_NAME, reply_inbox.to_string());

let job_reply_inbox = Subject::from(self.client.new_inbox());
let mut reply_subscriber = self
.client
.subscribe(job_reply_inbox.clone())
.subscribe(reply_inbox.clone())
.await
.map_err(|e| BlockingJobError::Nats(e.to_string()))?;
self.client
.publish_with_reply_and_headers(
self.pinga_subject.clone(),
job_reply_inbox,
propagation::empty_injected_headers(),

let workspace_pk = job_info
.access_builder
.tenancy()
.workspace_pk()
.ok_or(BlockingJobError::MissingWorkspacePk)?;

let subject = pinga_job(
self.prefix.as_deref(),
&String::from(workspace_pk),
&String::from(job_info.visibility.change_set_id),
&job_info.kind,
);

self.context
.publish_with_headers(
subject,
headers,
serde_json::to_vec(&job_info)
.map_err(|e| BlockingJobError::Serde(e.to_string()))?
.into(),
)
.await
// If `Err` then message failed to publish
.map_err(|e| BlockingJobError::Nats(e.to_string()))?
.await
// If `Err` then NATS server failed to ack
.map_err(|e| BlockingJobError::Nats(e.to_string()))?;

// TODO(fnichol): hrm, no timeout, so we wait forever? That's probably not expected?
match reply_subscriber.next().await {
Some(message) => serde_json::from_slice::<BlockingJobResult>(message.payload())
.map_err(|e| BlockingJobError::Serde(e.to_string()))?,
Expand Down
4 changes: 4 additions & 0 deletions lib/dal/src/job/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub enum BlockingJobError {
JobExecution(String),
#[error("JobProducer error: {0}")]
JobProducer(String),
#[error("stream create error: {0}")]
JsCreateStreamError(String),
#[error("missing required workspace_pk")]
MissingWorkspacePk,
#[error("A nats error occurred: {0}")]
Nats(String),
#[error("no access builder found in job info")]
Expand Down
11 changes: 11 additions & 0 deletions lib/pinga-core/BUCK
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
load("@prelude-si//:macros.bzl", "rust_library")

rust_library(
name = "pinga-core",
deps = [
"//lib/si-data-nats:si-data-nats",
],
srcs = glob([
"src/**/*.rs",
]),
)
Loading

0 comments on commit bd62702

Please sign in to comment.