diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 16cbcf2b3cfa8..55c64258d5ec2 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -711,6 +711,23 @@ dependencies = [ "tokio", ] +[[package]] +name = "common-kafka" +version = "0.1.0" +dependencies = [ + "chrono", + "envconfig", + "futures", + "health", + "rdkafka", + "serde", + "serde_json", + "thiserror", + "time", + "tracing", + "uuid", +] + [[package]] name = "common-metrics" version = "0.1.0" @@ -861,6 +878,7 @@ dependencies = [ "axum 0.7.5", "chrono", "common-dns", + "common-kafka", "common-metrics", "cyclotron-core", "envconfig", @@ -869,11 +887,13 @@ dependencies = [ "http 1.1.0", "httpmock", "rand", + "rdkafka", "reqwest 0.12.3", "serde", "serde_json", "sqlx", "thiserror", + "time", "tokio", "tracing", "tracing-subscriber", @@ -886,12 +906,16 @@ version = "0.1.0" dependencies = [ "axum 0.7.5", "chrono", + "common-kafka", "common-metrics", "cyclotron-core", "envconfig", "eyre", "health", + "rdkafka", + "serde_json", "sqlx", + "time", "tokio", "tracing", "tracing-subscriber", @@ -1585,10 +1609,8 @@ version = "0.1.0" dependencies = [ "async-trait", "chrono", - "envconfig", - "health", + "common-kafka", "http 1.1.0", - "rdkafka", "reqwest 0.12.3", "serde", "serde_json", @@ -1597,7 +1619,6 @@ dependencies = [ "time", "tokio", "tracing", - "uuid", ] [[package]] @@ -1606,6 +1627,7 @@ version = "0.1.0" dependencies = [ "async-trait", "axum 0.7.5", + "common-kafka", "common-metrics", "envconfig", "eyre", @@ -1631,6 +1653,7 @@ dependencies = [ "axum 0.7.5", "chrono", "common-dns", + "common-kafka", "common-metrics", "envconfig", "futures", diff --git a/rust/common/kafka/Cargo.toml b/rust/common/kafka/Cargo.toml new file mode 100644 index 0000000000000..715e7bc04265a --- /dev/null +++ b/rust/common/kafka/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "common-kafka" +version = "0.1.0" +edition = "2021" + +[lints] +workspace = true + +[dependencies] +chrono = { workspace = true } +envconfig = { workspace = true } +health = { path = "../health" } +rdkafka = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +time = { workspace = true } +tracing = { workspace = true } +uuid = { workspace = true } +thiserror = { workspace = true } +futures = { workspace = true } diff --git a/rust/hook-common/src/config.rs b/rust/common/kafka/src/config.rs similarity index 100% rename from rust/hook-common/src/config.rs rename to rust/common/kafka/src/config.rs diff --git a/rust/hook-common/src/kafka_messages/app_metrics.rs b/rust/common/kafka/src/kafka_messages/app_metrics.rs similarity index 100% rename from rust/hook-common/src/kafka_messages/app_metrics.rs rename to rust/common/kafka/src/kafka_messages/app_metrics.rs diff --git a/rust/hook-common/src/kafka_messages/app_metrics2.rs b/rust/common/kafka/src/kafka_messages/app_metrics2.rs similarity index 98% rename from rust/hook-common/src/kafka_messages/app_metrics2.rs rename to rust/common/kafka/src/kafka_messages/app_metrics2.rs index 3c2510fb010c0..db69513231ae7 100644 --- a/rust/hook-common/src/kafka_messages/app_metrics2.rs +++ b/rust/common/kafka/src/kafka_messages/app_metrics2.rs @@ -7,6 +7,7 @@ use super::{deserialize_datetime, serialize_datetime}; #[serde(rename_all = "lowercase")] pub enum Source { Hoghooks, + Cyclotron, } #[derive(Deserialize, Serialize, Debug, PartialEq, Clone)] @@ -14,6 +15,7 @@ pub enum Source { pub enum Kind { Success, Failure, + Unknown, } #[derive(Deserialize, Serialize, Debug, PartialEq, Clone)] diff --git a/rust/common/kafka/src/kafka_messages/log_entries.rs b/rust/common/kafka/src/kafka_messages/log_entries.rs new file mode 100644 index 0000000000000..fbc896bb56b27 --- /dev/null +++ b/rust/common/kafka/src/kafka_messages/log_entries.rs @@ -0,0 +1,23 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "lowercase")] +enum Level { + Error, + Debug, + Warn, + Info, + Warning, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +struct LogEntry { + team_id: u32, + log_source: String, + log_source_id: String, + instance_id: String, + timestamp: DateTime, + level: Level, + message: String, +} diff --git a/rust/hook-common/src/kafka_messages/mod.rs b/rust/common/kafka/src/kafka_messages/mod.rs similarity index 97% rename from rust/hook-common/src/kafka_messages/mod.rs rename to rust/common/kafka/src/kafka_messages/mod.rs index 92b9c605956e0..6507a2cc4cfcf 100644 --- a/rust/hook-common/src/kafka_messages/mod.rs +++ b/rust/common/kafka/src/kafka_messages/mod.rs @@ -1,5 +1,6 @@ pub mod app_metrics; pub mod app_metrics2; +pub mod log_entries; pub mod plugin_logs; use chrono::{DateTime, NaiveDateTime, Utc}; diff --git a/rust/hook-common/src/kafka_messages/plugin_logs.rs b/rust/common/kafka/src/kafka_messages/plugin_logs.rs similarity index 100% rename from rust/hook-common/src/kafka_messages/plugin_logs.rs rename to rust/common/kafka/src/kafka_messages/plugin_logs.rs diff --git a/rust/common/kafka/src/kafka_producer.rs b/rust/common/kafka/src/kafka_producer.rs new file mode 100644 index 0000000000000..4f30850125415 --- /dev/null +++ b/rust/common/kafka/src/kafka_producer.rs @@ -0,0 +1,121 @@ +use crate::config::KafkaConfig; + +use futures::future::join_all; +use health::HealthHandle; +use rdkafka::error::KafkaError; +use rdkafka::producer::{FutureProducer, FutureRecord}; +use rdkafka::ClientConfig; +use serde::Serialize; +use serde_json::error::Error as SerdeError; +use thiserror::Error; +use tracing::debug; + +pub struct KafkaContext { + liveness: HealthHandle, +} + +impl rdkafka::ClientContext for KafkaContext { + fn stats(&self, _: rdkafka::Statistics) { + // Signal liveness, as the main rdkafka loop is running and calling us + self.liveness.report_healthy_blocking(); + + // TODO: Take stats recording pieces that we want from `capture-rs`. + } +} + +pub async fn create_kafka_producer( + config: &KafkaConfig, + liveness: HealthHandle, +) -> Result, KafkaError> { + let mut client_config = ClientConfig::new(); + client_config + .set("bootstrap.servers", &config.kafka_hosts) + .set("statistics.interval.ms", "10000") + .set("linger.ms", config.kafka_producer_linger_ms.to_string()) + .set( + "message.timeout.ms", + config.kafka_message_timeout_ms.to_string(), + ) + .set( + "compression.codec", + config.kafka_compression_codec.to_owned(), + ) + .set( + "queue.buffering.max.kbytes", + (config.kafka_producer_queue_mib * 1024).to_string(), + ); + + if config.kafka_tls { + client_config + .set("security.protocol", "ssl") + .set("enable.ssl.certificate.verification", "false"); + }; + + debug!("rdkafka configuration: {:?}", client_config); + let api: FutureProducer = + client_config.create_with_context(KafkaContext { liveness })?; + + // TODO: ping the kafka brokers to confirm configuration is OK (copy capture) + + Ok(api) +} + +#[derive(Error, Debug)] +pub enum KafkaProduceError { + #[error("failed to serialize: {error}")] + SerializationError { error: SerdeError }, + #[error("failed to produce to kafka: {error}")] + KafkaProduceError { error: KafkaError }, + #[error("failed to produce to kafka (timeout)")] + KafkaProduceCanceled, +} + +pub async fn send_iter_to_kafka( + kafka_producer: &FutureProducer, + topic: &str, + iter: impl IntoIterator, +) -> Result<(), KafkaProduceError> +where + T: Serialize, +{ + let mut payloads = Vec::new(); + + for i in iter { + let payload = serde_json::to_string(&i) + .map_err(|e| KafkaProduceError::SerializationError { error: e })?; + payloads.push(payload); + } + + if payloads.is_empty() { + return Ok(()); + } + + let mut delivery_futures = Vec::new(); + + for payload in payloads { + match kafka_producer.send_result(FutureRecord { + topic, + payload: Some(&payload), + partition: None, + key: None::<&str>, + timestamp: None, + headers: None, + }) { + Ok(future) => delivery_futures.push(future), + Err((error, _)) => return Err(KafkaProduceError::KafkaProduceError { error }), + } + } + + for result in join_all(delivery_futures).await { + match result { + Ok(Ok(_)) => {} + Ok(Err((error, _))) => return Err(KafkaProduceError::KafkaProduceError { error }), + Err(_) => { + // Cancelled due to timeout while retrying + return Err(KafkaProduceError::KafkaProduceCanceled); + } + } + } + + Ok(()) +} diff --git a/rust/common/kafka/src/lib.rs b/rust/common/kafka/src/lib.rs new file mode 100644 index 0000000000000..ddfa0625f41c3 --- /dev/null +++ b/rust/common/kafka/src/lib.rs @@ -0,0 +1,7 @@ +pub mod config; +pub mod kafka_messages; +pub mod kafka_producer; +pub mod test; + +pub const APP_METRICS_TOPIC: &str = "app_metrics"; +pub const APP_METRICS2_TOPIC: &str = "app_metrics2"; diff --git a/rust/hook-common/src/test.rs b/rust/common/kafka/src/test.rs similarity index 100% rename from rust/hook-common/src/test.rs rename to rust/common/kafka/src/test.rs diff --git a/rust/cyclotron-core/.sqlx/query-2bd3251126625d8dd5143f58f4f9c4bbd0c3a17b7ea65767cf5e7512e5a6ea89.json b/rust/cyclotron-core/.sqlx/query-2bd3251126625d8dd5143f58f4f9c4bbd0c3a17b7ea65767cf5e7512e5a6ea89.json new file mode 100644 index 0000000000000..cfcbdd6288f56 --- /dev/null +++ b/rust/cyclotron-core/.sqlx/query-2bd3251126625d8dd5143f58f4f9c4bbd0c3a17b7ea65767cf5e7512e5a6ea89.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE cyclotron_jobs SET state = 'running', lock_id = $1, last_heartbeat=NOW() WHERE id = $2 returning queue_name", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "queue_name", + "type_info": "Text" + } + ], + "parameters": { + "Left": ["Uuid", "Uuid"] + }, + "nullable": [false] + }, + "hash": "2bd3251126625d8dd5143f58f4f9c4bbd0c3a17b7ea65767cf5e7512e5a6ea89" +} diff --git a/rust/cyclotron-core/.sqlx/query-2ca9ea5e8706bba21b14d9a349f3d0e39f01b19b243d724b09f3ce6617d03dc7.json b/rust/cyclotron-core/.sqlx/query-2ca9ea5e8706bba21b14d9a349f3d0e39f01b19b243d724b09f3ce6617d03dc7.json new file mode 100644 index 0000000000000..e69786b54b25e --- /dev/null +++ b/rust/cyclotron-core/.sqlx/query-2ca9ea5e8706bba21b14d9a349f3d0e39f01b19b243d724b09f3ce6617d03dc7.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE cyclotron_jobs SET state = 'available', lock_id = NULL, queue_name = $1 WHERE id = $2", + "describe": { + "columns": [], + "parameters": { + "Left": ["Text", "Uuid"] + }, + "nullable": [] + }, + "hash": "2ca9ea5e8706bba21b14d9a349f3d0e39f01b19b243d724b09f3ce6617d03dc7" +} diff --git a/rust/cyclotron-core/.sqlx/query-385e94f4adab0f85174968f6eee873bf6d1d43884cd628df5b36978dd761b025.json b/rust/cyclotron-core/.sqlx/query-385e94f4adab0f85174968f6eee873bf6d1d43884cd628df5b36978dd761b025.json new file mode 100644 index 0000000000000..5c6b66d3f8739 --- /dev/null +++ b/rust/cyclotron-core/.sqlx/query-385e94f4adab0f85174968f6eee873bf6d1d43884cd628df5b36978dd761b025.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\nSELECT id FROM cyclotron_jobs WHERE state = 'running' AND COALESCE(last_heartbeat, $1) <= $1 AND janitor_touch_count >= $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": ["Timestamptz", "Int2"] + }, + "nullable": [false] + }, + "hash": "385e94f4adab0f85174968f6eee873bf6d1d43884cd628df5b36978dd761b025" +} diff --git a/rust/cyclotron-core/.sqlx/query-78f54fcebc11e2411008448281e4711bdfb8cf78e362ccda8bc14e92324d51f8.json b/rust/cyclotron-core/.sqlx/query-78f54fcebc11e2411008448281e4711bdfb8cf78e362ccda8bc14e92324d51f8.json new file mode 100644 index 0000000000000..d70d4c9d33a43 --- /dev/null +++ b/rust/cyclotron-core/.sqlx/query-78f54fcebc11e2411008448281e4711bdfb8cf78e362ccda8bc14e92324d51f8.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT COUNT(*) FROM cyclotron_jobs WHERE queue_name = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": ["Text"] + }, + "nullable": [null] + }, + "hash": "78f54fcebc11e2411008448281e4711bdfb8cf78e362ccda8bc14e92324d51f8" +} diff --git a/rust/cyclotron-core/.sqlx/query-b8c1b723826d595dca0389d729fa76bd8a7d96d73983a0c408f32f17da5f483b.json b/rust/cyclotron-core/.sqlx/query-b8c1b723826d595dca0389d729fa76bd8a7d96d73983a0c408f32f17da5f483b.json new file mode 100644 index 0000000000000..8f201d80503ce --- /dev/null +++ b/rust/cyclotron-core/.sqlx/query-b8c1b723826d595dca0389d729fa76bd8a7d96d73983a0c408f32f17da5f483b.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO cyclotron_dead_letter_metadata (job_id, original_queue_name, reason, dlq_time) VALUES ($1, $2, $3, NOW())", + "describe": { + "columns": [], + "parameters": { + "Left": ["Uuid", "Text", "Text"] + }, + "nullable": [] + }, + "hash": "b8c1b723826d595dca0389d729fa76bd8a7d96d73983a0c408f32f17da5f483b" +} diff --git a/rust/cyclotron-core/.sqlx/query-e842f1ed33747bde4570c6d861d856c4cbd8beb519df8212212017dda9d06c51.json b/rust/cyclotron-core/.sqlx/query-e842f1ed33747bde4570c6d861d856c4cbd8beb519df8212212017dda9d06c51.json new file mode 100644 index 0000000000000..145fe9bf7bc9f --- /dev/null +++ b/rust/cyclotron-core/.sqlx/query-e842f1ed33747bde4570c6d861d856c4cbd8beb519df8212212017dda9d06c51.json @@ -0,0 +1,38 @@ +{ + "db_name": "PostgreSQL", + "query": "\nWITH to_delete AS (\n DELETE FROM cyclotron_jobs\n WHERE state IN ('failed', 'completed')\n RETURNING last_transition, team_id, function_id::text, state::text\n),\naggregated_data AS (\n SELECT\n date_trunc('hour', last_transition) AS hour,\n team_id,\n function_id,\n state,\n COUNT(*) AS count\n FROM to_delete\n GROUP BY hour, team_id, function_id, state\n)\nSELECT\n hour as \"hour!\",\n team_id as \"team_id!\",\n function_id,\n state as \"state!\",\n count as \"count!\"\nFROM aggregated_data", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "hour!", + "type_info": "Timestamptz" + }, + { + "ordinal": 1, + "name": "team_id!", + "type_info": "Int4" + }, + { + "ordinal": 2, + "name": "function_id", + "type_info": "Text" + }, + { + "ordinal": 3, + "name": "state!", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "count!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [null, false, null, null, null] + }, + "hash": "e842f1ed33747bde4570c6d861d856c4cbd8beb519df8212212017dda9d06c51" +} diff --git a/rust/cyclotron-core/.sqlx/query-f4e808f58dd290c6e2b49b63e9e0eb022936ba318021512a0cc0c2e0766abe7c.json b/rust/cyclotron-core/.sqlx/query-f4e808f58dd290c6e2b49b63e9e0eb022936ba318021512a0cc0c2e0766abe7c.json deleted file mode 100644 index a585e9f7e7d73..0000000000000 --- a/rust/cyclotron-core/.sqlx/query-f4e808f58dd290c6e2b49b63e9e0eb022936ba318021512a0cc0c2e0766abe7c.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "DELETE FROM cyclotron_jobs WHERE state = 'completed'", - "describe": { - "columns": [], - "parameters": { - "Left": [] - }, - "nullable": [] - }, - "hash": "f4e808f58dd290c6e2b49b63e9e0eb022936ba318021512a0cc0c2e0766abe7c" -} diff --git a/rust/cyclotron-core/.sqlx/query-fdda5a80f5495f2d4b15ce1a0963f990986c8b8433f01e449fbd1eee70ce6aeb.json b/rust/cyclotron-core/.sqlx/query-fdda5a80f5495f2d4b15ce1a0963f990986c8b8433f01e449fbd1eee70ce6aeb.json deleted file mode 100644 index 09fc24b340d3f..0000000000000 --- a/rust/cyclotron-core/.sqlx/query-fdda5a80f5495f2d4b15ce1a0963f990986c8b8433f01e449fbd1eee70ce6aeb.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\nDELETE FROM cyclotron_jobs WHERE state = 'running' AND COALESCE(last_heartbeat, $1) <= $1 AND janitor_touch_count >= $2\n ", - "describe": { - "columns": [], - "parameters": { - "Left": ["Timestamptz", "Int2"] - }, - "nullable": [] - }, - "hash": "fdda5a80f5495f2d4b15ce1a0963f990986c8b8433f01e449fbd1eee70ce6aeb" -} diff --git a/rust/cyclotron-core/.sqlx/query-ffb66bdedf6506f95b9293ef88b0c51e2f5fb7d3271e1287165d2a35b6aaa25e.json b/rust/cyclotron-core/.sqlx/query-ffb66bdedf6506f95b9293ef88b0c51e2f5fb7d3271e1287165d2a35b6aaa25e.json deleted file mode 100644 index 605d79d57c098..0000000000000 --- a/rust/cyclotron-core/.sqlx/query-ffb66bdedf6506f95b9293ef88b0c51e2f5fb7d3271e1287165d2a35b6aaa25e.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "DELETE FROM cyclotron_jobs WHERE state = 'failed'", - "describe": { - "columns": [], - "parameters": { - "Left": [] - }, - "nullable": [] - }, - "hash": "ffb66bdedf6506f95b9293ef88b0c51e2f5fb7d3271e1287165d2a35b6aaa25e" -} diff --git a/rust/cyclotron-core/Cargo.toml b/rust/cyclotron-core/Cargo.toml index bfec9301eee72..18598fd0b37f5 100644 --- a/rust/cyclotron-core/Cargo.toml +++ b/rust/cyclotron-core/Cargo.toml @@ -14,4 +14,4 @@ tokio = { workspace = true } thiserror = { workspace = true } uuid = { workspace = true } rand = { workspace = true } -futures = { workspace = true } \ No newline at end of file +futures = { workspace = true } diff --git a/rust/cyclotron-core/src/janitor.rs b/rust/cyclotron-core/src/janitor.rs index 8fd98307fba67..dd48f031b1060 100644 --- a/rust/cyclotron-core/src/janitor.rs +++ b/rust/cyclotron-core/src/janitor.rs @@ -4,11 +4,10 @@ use sqlx::PgPool; use crate::{ ops::{ - janitor::{ - delete_completed_jobs, delete_failed_jobs, detect_poison_pills, reset_stalled_jobs, - }, + janitor::{delete_completed_and_failed_jobs, detect_poison_pills, reset_stalled_jobs}, meta::{count_total_waiting_jobs, dead_letter, run_migrations}, }, + types::AggregatedDelete, PoolConfig, QueueError, }; @@ -31,12 +30,10 @@ impl Janitor { run_migrations(&self.pool).await; } - pub async fn delete_completed_jobs(&self) -> Result { - delete_completed_jobs(&self.pool).await - } - - pub async fn delete_failed_jobs(&self) -> Result { - delete_failed_jobs(&self.pool).await + pub async fn delete_completed_and_failed_jobs( + &self, + ) -> Result, QueueError> { + delete_completed_and_failed_jobs(&self.pool).await } pub async fn reset_stalled_jobs(&self, timeout: Duration) -> Result { diff --git a/rust/cyclotron-core/src/lib.rs b/rust/cyclotron-core/src/lib.rs index e737f38360165..f845ccee042f8 100644 --- a/rust/cyclotron-core/src/lib.rs +++ b/rust/cyclotron-core/src/lib.rs @@ -4,6 +4,7 @@ mod ops; // Types mod types; +pub use types::AggregatedDelete; pub use types::BulkInsertResult; pub use types::Bytes; pub use types::Job; diff --git a/rust/cyclotron-core/src/ops/janitor.rs b/rust/cyclotron-core/src/ops/janitor.rs index 16bdb9180f0f9..488d629730342 100644 --- a/rust/cyclotron-core/src/ops/janitor.rs +++ b/rust/cyclotron-core/src/ops/janitor.rs @@ -2,32 +2,49 @@ use chrono::{Duration, Utc}; use uuid::Uuid; use crate::error::QueueError; +use crate::types::AggregatedDelete; // As a general rule, janitor operations are not queue specific (as in, they don't account for the // queue name). We can revisit this later, if we decide we need the ability to do janitor operations // on a per-queue basis. -pub async fn delete_completed_jobs<'c, E>(executor: E) -> Result -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let result = sqlx::query!("DELETE FROM cyclotron_jobs WHERE state = 'completed'") - .execute(executor) - .await - .map_err(QueueError::from)?; - - Ok(result.rows_affected()) -} -pub async fn delete_failed_jobs<'c, E>(executor: E) -> Result +pub async fn delete_completed_and_failed_jobs<'c, E>( + executor: E, +) -> Result, QueueError> where E: sqlx::Executor<'c, Database = sqlx::Postgres>, { - let result = sqlx::query!("DELETE FROM cyclotron_jobs WHERE state = 'failed'") - .execute(executor) - .await - .map_err(QueueError::from)?; + let result: Vec = sqlx::query_as!( + AggregatedDelete, + r#" +WITH to_delete AS ( + DELETE FROM cyclotron_jobs + WHERE state IN ('failed', 'completed') + RETURNING last_transition, team_id, function_id::text, state::text +), +aggregated_data AS ( + SELECT + date_trunc('hour', last_transition) AS hour, + team_id, + function_id, + state, + COUNT(*) AS count + FROM to_delete + GROUP BY hour, team_id, function_id, state +) +SELECT + hour as "hour!", + team_id as "team_id!", + function_id, + state as "state!", + count as "count!" +FROM aggregated_data"# + ) + .fetch_all(executor) + .await + .map_err(QueueError::from)?; - Ok(result.rows_affected()) + Ok(result) } // Jobs are considered stalled if their lock is held and their last_heartbeat is older than `timeout`. diff --git a/rust/cyclotron-core/src/types.rs b/rust/cyclotron-core/src/types.rs index 5adf86c6050b4..a2def554794e0 100644 --- a/rust/cyclotron-core/src/types.rs +++ b/rust/cyclotron-core/src/types.rs @@ -1,8 +1,7 @@ -use std::str::FromStr; - use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use sqlx::postgres::{PgHasArrayType, PgTypeInfo}; +use std::str::FromStr; use uuid::Uuid; use crate::QueueError; @@ -143,3 +142,14 @@ impl Default for BulkInsertResult { Self::new() } } + +// Result of janitor's `delete_completed_and_failed_jobs` +#[derive(sqlx::FromRow, Debug)] +pub struct AggregatedDelete { + // `last_transition` column truncated to the hour. + pub hour: DateTime, + pub team_id: i64, + pub function_id: Option, + pub state: String, + pub count: i64, +} diff --git a/rust/cyclotron-fetch/Cargo.toml b/rust/cyclotron-fetch/Cargo.toml index e9f8de05bcff0..69f6f4ac2adf1 100644 --- a/rust/cyclotron-fetch/Cargo.toml +++ b/rust/cyclotron-fetch/Cargo.toml @@ -18,6 +18,7 @@ thiserror = { workspace = true } cyclotron-core = { path = "../cyclotron-core" } common-metrics = { path = "../common/metrics" } common-dns = { path = "../common/dns" } +common-kafka = { path = "../common/kafka" } health = { path = "../common/health" } reqwest = { workspace = true } serde = { workspace = true } @@ -25,7 +26,9 @@ serde_json = { workspace = true } http = { workspace = true } rand = { workspace = true } futures = { workspace = true } +time = { workspace = true } +rdkafka = { workspace = true } [dev-dependencies] sqlx = { workspace = true } -httpmock = { workspace = true } \ No newline at end of file +httpmock = { workspace = true } diff --git a/rust/cyclotron-fetch/src/config.rs b/rust/cyclotron-fetch/src/config.rs index a57cbafe5e287..752ba5f32217a 100644 --- a/rust/cyclotron-fetch/src/config.rs +++ b/rust/cyclotron-fetch/src/config.rs @@ -3,6 +3,8 @@ use cyclotron_core::PoolConfig; use envconfig::Envconfig; use uuid::Uuid; +use common_kafka::config::KafkaConfig; + #[derive(Envconfig)] pub struct Config { #[envconfig(from = "BIND_HOST", default = "::")] @@ -61,6 +63,9 @@ pub struct Config { #[envconfig(default = "4000")] pub retry_backoff_base_ms: i64, + + #[envconfig(nested = true)] + pub kafka: KafkaConfig, } #[allow(dead_code)] @@ -86,7 +91,7 @@ pub struct AppConfig { } impl Config { - pub fn to_components(self) -> (AppConfig, PoolConfig) { + pub fn to_components(self) -> (AppConfig, PoolConfig, KafkaConfig) { let app_config = AppConfig { host: self.host, port: self.port, @@ -112,6 +117,6 @@ impl Config { idle_timeout_seconds: Some(self.pg_idle_timeout_seconds), }; - (app_config, pool_config) + (app_config, pool_config, self.kafka) } } diff --git a/rust/cyclotron-fetch/src/context.rs b/rust/cyclotron-fetch/src/context.rs index f10f4149b1ada..d88acd1633c02 100644 --- a/rust/cyclotron-fetch/src/context.rs +++ b/rust/cyclotron-fetch/src/context.rs @@ -1,7 +1,11 @@ use std::sync::{Arc, RwLock}; +use common_kafka::config::KafkaConfig; +use common_kafka::kafka_producer::create_kafka_producer; +use common_kafka::kafka_producer::KafkaContext; use cyclotron_core::{PoolConfig, Worker, SHARD_ID_KEY}; use health::HealthHandle; +use rdkafka::producer::FutureProducer; use tokio::sync::Semaphore; use crate::{config::AppConfig, fetch::FetchError}; @@ -9,6 +13,7 @@ use crate::{config::AppConfig, fetch::FetchError}; pub struct AppContext { pub worker: Worker, pub client: reqwest::Client, + pub kafka_producer: FutureProducer, pub concurrency_limit: Arc, pub liveness: HealthHandle, pub config: AppConfig, @@ -19,7 +24,9 @@ impl AppContext { pub async fn create( config: AppConfig, pool_config: PoolConfig, + kafka_config: KafkaConfig, liveness: HealthHandle, + kafka_liveness: HealthHandle, ) -> Result { let concurrency_limit = Arc::new(Semaphore::new(config.concurrent_requests_limit as usize)); @@ -51,9 +58,14 @@ impl AppContext { ("queue_served".to_string(), config.queue_served.clone()), ]; + let kafka_producer = create_kafka_producer(&kafka_config, kafka_liveness) + .await + .expect("failed to create kafka producer"); + Ok(Self { worker, client, + kafka_producer, concurrency_limit, liveness, config, diff --git a/rust/cyclotron-fetch/src/main.rs b/rust/cyclotron-fetch/src/main.rs index c0c02c6f5404b..2013f1b6c7218 100644 --- a/rust/cyclotron-fetch/src/main.rs +++ b/rust/cyclotron-fetch/src/main.rs @@ -54,7 +54,7 @@ async fn main() { let liveness = HealthRegistry::new("liveness"); - let (app_config, pool_config) = config.to_components(); + let (app_config, pool_config, kafka_config) = config.to_components(); let bind = format!("{}:{}", app_config.host, app_config.port); info!( @@ -69,11 +69,21 @@ async fn main() { ) .await; + let kafka_liveness = liveness + .register("rdkafka".to_string(), time::Duration::seconds(30)) + .await; + let app = setup_metrics_routes(app(liveness, app_config.worker_id.clone())); - let context = AppContext::create(app_config, pool_config, worker_liveness) - .await - .expect("failed to create app context"); + let context = AppContext::create( + app_config, + pool_config, + kafka_config, + worker_liveness, + kafka_liveness, + ) + .await + .expect("failed to create app context"); context.worker.run_migrations().await; diff --git a/rust/cyclotron-fetch/tests/utils.rs b/rust/cyclotron-fetch/tests/utils.rs index 6041a491d3f9b..7faef3d4bec08 100644 --- a/rust/cyclotron-fetch/tests/utils.rs +++ b/rust/cyclotron-fetch/tests/utils.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use chrono::{Duration, Utc}; +use common_kafka::test::create_mock_kafka; use cyclotron_core::{Bytes, Job, JobInit, QueueError, Worker}; use cyclotron_fetch::{ config::AppConfig, @@ -39,9 +40,12 @@ pub async fn get_app_test_context(db: PgPool) -> AppContext { allow_internal_ips: true, }; + let (_, mock_producer) = create_mock_kafka().await; + AppContext { worker, client, + kafka_producer: mock_producer, concurrency_limit, liveness, config, diff --git a/rust/cyclotron-janitor/Cargo.toml b/rust/cyclotron-janitor/Cargo.toml index 3363a16aac4a6..d6eb553d3e72f 100644 --- a/rust/cyclotron-janitor/Cargo.toml +++ b/rust/cyclotron-janitor/Cargo.toml @@ -17,7 +17,11 @@ axum = { workspace = true } eyre = { workspace = true } cyclotron-core = { path = "../cyclotron-core" } common-metrics = { path = "../common/metrics" } +common-kafka = { path = "../common/kafka" } health = { path = "../common/health" } +time = { workspace = true } +rdkafka = { workspace = true } [dev-dependencies] -sqlx = { workspace = true } \ No newline at end of file +sqlx = { workspace = true } +serde_json = { workspace = true } diff --git a/rust/cyclotron-janitor/src/config.rs b/rust/cyclotron-janitor/src/config.rs index 40ab9ee558a52..005f438c8f1d2 100644 --- a/rust/cyclotron-janitor/src/config.rs +++ b/rust/cyclotron-janitor/src/config.rs @@ -4,6 +4,8 @@ use cyclotron_core::PoolConfig; use envconfig::Envconfig; use uuid::Uuid; +use common_kafka::config::KafkaConfig; + #[derive(Envconfig)] pub struct Config { #[envconfig(from = "BIND_HOST", default = "::")] @@ -46,6 +48,9 @@ pub struct Config { #[envconfig(default = "60")] pub janitor_stall_timeout_seconds: u16, + + #[envconfig(nested = true)] + pub kafka: KafkaConfig, } #[allow(dead_code)] @@ -73,6 +78,7 @@ impl Config { JanitorConfig { pool: pool_config, + kafka: self.kafka.clone(), settings, } } @@ -80,6 +86,7 @@ impl Config { pub struct JanitorConfig { pub pool: PoolConfig, + pub kafka: KafkaConfig, pub settings: JanitorSettings, } diff --git a/rust/cyclotron-janitor/src/janitor.rs b/rust/cyclotron-janitor/src/janitor.rs index be36c07ec009d..6fc0b248b54ab 100644 --- a/rust/cyclotron-janitor/src/janitor.rs +++ b/rust/cyclotron-janitor/src/janitor.rs @@ -1,5 +1,14 @@ -use cyclotron_core::{QueueError, SHARD_ID_KEY}; -use tracing::{info, warn}; +use common_kafka::kafka_messages::app_metrics2::{ + AppMetric2, Kind as AppMetric2Kind, Source as AppMetric2Source, +}; +use common_kafka::kafka_producer::create_kafka_producer; +use common_kafka::kafka_producer::{send_iter_to_kafka, KafkaContext, KafkaProduceError}; +use common_kafka::APP_METRICS2_TOPIC; +use cyclotron_core::{AggregatedDelete, QueueError, SHARD_ID_KEY}; +use health::HealthRegistry; +use tracing::{error, info, warn}; + +use rdkafka::producer::FutureProducer; use crate::{ config::{JanitorConfig, JanitorSettings}, @@ -17,12 +26,16 @@ pub struct CleanupResult { pub struct Janitor { pub inner: cyclotron_core::Janitor, + pub kafka_producer: FutureProducer, pub settings: JanitorSettings, pub metrics_labels: Vec<(String, String)>, } impl Janitor { - pub async fn new(config: JanitorConfig) -> Result { + pub async fn new( + config: JanitorConfig, + health_registry: &HealthRegistry, + ) -> Result { let settings = config.settings; let inner = cyclotron_core::Janitor::new(config.pool).await?; @@ -31,8 +44,17 @@ impl Janitor { (SHARD_ID_KEY.to_string(), settings.shard_id.clone()), ]; + let kafka_liveness = health_registry + .register("rdkafka".to_string(), time::Duration::seconds(30)) + .await; + + let kafka_producer = create_kafka_producer(&config.kafka, kafka_liveness) + .await + .expect("failed to create kafka producer"); + Ok(Self { inner, + kafka_producer, settings, metrics_labels, }) @@ -47,17 +69,43 @@ impl Janitor { let _loop_start = common_metrics::timing_guard(RUN_TIME, &self.metrics_labels); common_metrics::inc(RUN_STARTS, &self.metrics_labels, 1); - let completed = { - let _time = common_metrics::timing_guard(COMPLETED_TIME, &self.metrics_labels); - self.inner.delete_completed_jobs().await? + let aggregated_deletes = { + let _time = common_metrics::timing_guard(CLEANUP_TIME, &self.metrics_labels); + self.inner.delete_completed_and_failed_jobs().await? }; - common_metrics::inc(COMPLETED_COUNT, &self.metrics_labels, completed); - let failed = { - let _time = common_metrics::timing_guard(FAILED_TIME, &self.metrics_labels); - self.inner.delete_failed_jobs().await? - }; - common_metrics::inc(FAILED_COUNT, &self.metrics_labels, failed); + let mut completed_count = 0u64; + let mut failed_count = 0u64; + for delete in &aggregated_deletes { + if delete.state == "completed" { + completed_count += delete.count as u64; + } else if delete.state == "failed" { + failed_count += delete.count as u64; + } + } + common_metrics::inc(COMPLETED_COUNT, &self.metrics_labels, completed_count); + common_metrics::inc(FAILED_COUNT, &self.metrics_labels, failed_count); + + match send_iter_to_kafka( + &self.kafka_producer, + APP_METRICS2_TOPIC, + aggregated_deletes + .into_iter() + .map(aggregated_delete_to_app_metric2), + ) + .await + { + Ok(()) => {} + Err(KafkaProduceError::SerializationError { error }) => { + error!("Failed to serialize app_metrics2: {error}"); + } + Err(KafkaProduceError::KafkaProduceError { error }) => { + error!("Failed to produce to app_metrics2 kafka: {error}"); + } + Err(KafkaProduceError::KafkaProduceCanceled) => { + error!("Failed to produce to app_metrics2 kafka (timeout)"); + } + } let poisoned = { let _time = common_metrics::timing_guard(POISONED_TIME, &self.metrics_labels); @@ -98,10 +146,29 @@ impl Janitor { common_metrics::inc(RUN_ENDS, &self.metrics_labels, 1); info!("Janitor loop complete"); Ok(CleanupResult { - completed, - failed, + completed: completed_count, + failed: failed_count, poisoned, stalled, }) } } + +fn aggregated_delete_to_app_metric2(delete: AggregatedDelete) -> AppMetric2 { + let kind = match delete.state.as_str() { + "completed" => AppMetric2Kind::Success, + "failed" => AppMetric2Kind::Failure, + _ => AppMetric2Kind::Unknown, + }; + + AppMetric2 { + team_id: delete.team_id as u32, + timestamp: delete.hour, + app_source: AppMetric2Source::Cyclotron, + app_source_id: delete.function_id.unwrap_or("".to_owned()), + instance_id: None, + metric_kind: kind, + metric_name: "finished_state".to_owned(), + count: delete.count as u32, + } +} diff --git a/rust/cyclotron-janitor/src/main.rs b/rust/cyclotron-janitor/src/main.rs index 0db35e52b8bc5..fa0f682601e61 100644 --- a/rust/cyclotron-janitor/src/main.rs +++ b/rust/cyclotron-janitor/src/main.rs @@ -66,7 +66,7 @@ async fn main() { janitor_id, bind ); - let janitor = Janitor::new(janitor_config) + let janitor = Janitor::new(janitor_config, &liveness) .await .expect("failed to create janitor"); diff --git a/rust/cyclotron-janitor/src/metrics_constants.rs b/rust/cyclotron-janitor/src/metrics_constants.rs index 2da1822484ee5..b3e838370a40d 100644 --- a/rust/cyclotron-janitor/src/metrics_constants.rs +++ b/rust/cyclotron-janitor/src/metrics_constants.rs @@ -3,10 +3,8 @@ pub const RUN_TIME: &str = "cyclotron_janitor_total_run_ms"; pub const RUN_ENDS: &str = "cyclotron_janitor_run_ends"; pub const COMPLETED_COUNT: &str = "cyclotron_janitor_completed_jobs"; -pub const COMPLETED_TIME: &str = "cyclotron_janitor_completed_jobs_cleanup_ms"; - pub const FAILED_COUNT: &str = "cyclotron_janitor_failed_jobs"; -pub const FAILED_TIME: &str = "cyclotron_janitor_failed_jobs_cleanup_ms"; +pub const CLEANUP_TIME: &str = "cyclotron_janitor_completed_failed_jobs_cleanup_ms"; pub const POISONED_COUNT: &str = "cyclotron_janitor_poison_pills"; pub const POISONED_TIME: &str = "cyclotron_janitor_poison_pills_cleanup_ms"; diff --git a/rust/cyclotron-janitor/tests/janitor.rs b/rust/cyclotron-janitor/tests/janitor.rs index 32846d7f8c647..d213477613c78 100644 --- a/rust/cyclotron-janitor/tests/janitor.rs +++ b/rust/cyclotron-janitor/tests/janitor.rs @@ -1,10 +1,18 @@ -use chrono::{Duration, Utc}; - +use chrono::{DateTime, Duration, Utc}; +use common_kafka::kafka_messages::app_metrics2::{ + AppMetric2, Kind as AppMetric2Kind, Source as AppMetric2Source, +}; use cyclotron_core::{JobInit, JobState, QueueManager, Worker}; use cyclotron_janitor::{config::JanitorSettings, janitor::Janitor}; +use rdkafka::consumer::{Consumer, StreamConsumer}; +use rdkafka::types::{RDKafkaApiKey, RDKafkaRespErr}; +use rdkafka::{ClientConfig, Message}; use sqlx::PgPool; +use std::str::FromStr; use uuid::Uuid; +use common_kafka::{test::create_mock_kafka, APP_METRICS2_TOPIC}; + #[sqlx::test(migrations = "../cyclotron-core/migrations")] async fn janitor_test(db: PgPool) { let worker = Worker::from_pool(db.clone()); @@ -15,6 +23,19 @@ async fn janitor_test(db: PgPool) { let stall_timeout = Duration::milliseconds(10); let max_touches = 3; + let (mock_cluster, mock_producer) = create_mock_kafka().await; + mock_cluster + .create_topic(APP_METRICS2_TOPIC, 1, 1) + .expect("failed to create mock app_metrics2 topic"); + + let kafka_consumer: StreamConsumer = ClientConfig::new() + .set("bootstrap.servers", mock_cluster.bootstrap_servers()) + .set("group.id", "mock") + .set("auto.offset.reset", "earliest") + .create() + .expect("failed to create mock consumer"); + kafka_consumer.subscribe(&[APP_METRICS2_TOPIC]).unwrap(); + let settings = JanitorSettings { stall_timeout, max_touches, @@ -23,6 +44,7 @@ async fn janitor_test(db: PgPool) { }; let janitor = Janitor { inner: cyclotron_core::Janitor::from_pool(db.clone()), + kafka_producer: mock_producer, settings, metrics_labels: vec![], }; @@ -30,12 +52,13 @@ async fn janitor_test(db: PgPool) { let now = Utc::now() - Duration::seconds(10); let queue_name = "default".to_string(); + let uuid = Uuid::now_v7(); let job_init = JobInit { team_id: 1, queue_name: queue_name.clone(), priority: 0, scheduled: now, - function_id: Some(Uuid::now_v7()), + function_id: Some(uuid.clone()), vm_state: None, parameters: None, blob: None, @@ -60,6 +83,26 @@ async fn janitor_test(db: PgPool) { assert_eq!(result.poisoned, 0); assert_eq!(result.stalled, 0); + { + let kafka_msg = kafka_consumer.recv().await.unwrap(); + let payload_str = String::from_utf8(kafka_msg.payload().unwrap().to_vec()).unwrap(); + let app_metric: AppMetric2 = serde_json::from_str(&payload_str).unwrap(); + + assert_eq!( + app_metric, + AppMetric2 { + team_id: 1, + timestamp: DateTime::::from_str("2024-08-30T19:00:00Z").unwrap(), + app_source: AppMetric2Source::Cyclotron, + app_source_id: uuid.to_string(), + instance_id: None, + metric_kind: AppMetric2Kind::Success, + metric_name: "finished_state".to_owned(), + count: 1 + } + ); + } + // Second test - if we mark a job as failed, the janitor will clean it up manager.create_job(job_init.clone()).await.unwrap(); let job = worker @@ -78,6 +121,26 @@ async fn janitor_test(db: PgPool) { assert_eq!(result.poisoned, 0); assert_eq!(result.stalled, 0); + { + let kafka_msg = kafka_consumer.recv().await.unwrap(); + let payload_str = String::from_utf8(kafka_msg.payload().unwrap().to_vec()).unwrap(); + let app_metric: AppMetric2 = serde_json::from_str(&payload_str).unwrap(); + + assert_eq!( + app_metric, + AppMetric2 { + team_id: 1, + timestamp: DateTime::::from_str("2024-08-30T19:00:00Z").unwrap(), + app_source: AppMetric2Source::Cyclotron, + app_source_id: uuid.to_string(), + instance_id: None, + metric_kind: AppMetric2Kind::Failure, + metric_name: "finished_state".to_owned(), + count: 1 + } + ); + } + // Third test - if we pick up a job, and then hold it for longer than // the stall timeout, the janitor will reset it. After this, the worker // cannot flush updates to the job, and must re-dequeue it. diff --git a/rust/feature-flags/src/v0_endpoint.rs b/rust/feature-flags/src/v0_endpoint.rs index 56734eae32d45..9adfa67e882a3 100644 --- a/rust/feature-flags/src/v0_endpoint.rs +++ b/rust/feature-flags/src/v0_endpoint.rs @@ -85,5 +85,5 @@ fn record_request_metadata( tracing::Span::current().record("method", method.as_str()); tracing::Span::current().record("path", path.as_str().trim_end_matches('/')); tracing::Span::current().record("ip", ip.to_string()); - tracing::Span::current().record("sent_at", &meta.sent_at.unwrap_or(0).to_string()); + tracing::Span::current().record("sent_at", meta.sent_at.unwrap_or(0).to_string()); } diff --git a/rust/hook-common/Cargo.toml b/rust/hook-common/Cargo.toml index e6b2625c23905..b63d7bfd5d973 100644 --- a/rust/hook-common/Cargo.toml +++ b/rust/hook-common/Cargo.toml @@ -9,10 +9,7 @@ workspace = true [dependencies] async-trait = { workspace = true } chrono = { workspace = true } -envconfig = { workspace = true } -health = { path = "../common/health" } http = { workspace = true } -rdkafka = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } @@ -21,7 +18,7 @@ thiserror = { workspace = true } time = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } -uuid = { workspace = true } +common-kafka = { path = "../common/kafka" } [dev-dependencies] tokio = { workspace = true } # We need a runtime for async tests diff --git a/rust/hook-common/src/kafka_producer.rs b/rust/hook-common/src/kafka_producer.rs deleted file mode 100644 index 92608bcb999c8..0000000000000 --- a/rust/hook-common/src/kafka_producer.rs +++ /dev/null @@ -1,57 +0,0 @@ -use crate::config::KafkaConfig; - -use health::HealthHandle; -use rdkafka::error::KafkaError; -use rdkafka::producer::FutureProducer; -use rdkafka::ClientConfig; -use tracing::debug; - -pub struct KafkaContext { - liveness: HealthHandle, -} - -impl rdkafka::ClientContext for KafkaContext { - fn stats(&self, _: rdkafka::Statistics) { - // Signal liveness, as the main rdkafka loop is running and calling us - self.liveness.report_healthy_blocking(); - - // TODO: Take stats recording pieces that we want from `capture-rs`. - } -} - -pub async fn create_kafka_producer( - config: &KafkaConfig, - liveness: HealthHandle, -) -> Result, KafkaError> { - let mut client_config = ClientConfig::new(); - client_config - .set("bootstrap.servers", &config.kafka_hosts) - .set("statistics.interval.ms", "10000") - .set("linger.ms", config.kafka_producer_linger_ms.to_string()) - .set( - "message.timeout.ms", - config.kafka_message_timeout_ms.to_string(), - ) - .set( - "compression.codec", - config.kafka_compression_codec.to_owned(), - ) - .set( - "queue.buffering.max.kbytes", - (config.kafka_producer_queue_mib * 1024).to_string(), - ); - - if config.kafka_tls { - client_config - .set("security.protocol", "ssl") - .set("enable.ssl.certificate.verification", "false"); - }; - - debug!("rdkafka configuration: {:?}", client_config); - let api: FutureProducer = - client_config.create_with_context(KafkaContext { liveness })?; - - // TODO: ping the kafka brokers to confirm configuration is OK (copy capture) - - Ok(api) -} diff --git a/rust/hook-common/src/lib.rs b/rust/hook-common/src/lib.rs index e1446d80c338f..37d193eeed596 100644 --- a/rust/hook-common/src/lib.rs +++ b/rust/hook-common/src/lib.rs @@ -1,7 +1,3 @@ -pub mod config; -pub mod kafka_messages; -pub mod kafka_producer; pub mod pgqueue; pub mod retry; -pub mod test; pub mod webhook; diff --git a/rust/hook-common/src/webhook.rs b/rust/hook-common/src/webhook.rs index 5286629978931..b47e4ba909f39 100644 --- a/rust/hook-common/src/webhook.rs +++ b/rust/hook-common/src/webhook.rs @@ -5,8 +5,8 @@ use std::str::FromStr; use serde::{de::Visitor, Deserialize, Serialize}; -use crate::kafka_messages::app_metrics; use crate::pgqueue::ParseError; +use common_kafka::kafka_messages::app_metrics; /// Supported HTTP methods for webhooks. #[derive(Debug, PartialEq, Clone, Copy)] diff --git a/rust/hook-janitor/Cargo.toml b/rust/hook-janitor/Cargo.toml index a4fa315da70f1..dba9bef7e7046 100644 --- a/rust/hook-janitor/Cargo.toml +++ b/rust/hook-janitor/Cargo.toml @@ -24,4 +24,5 @@ time = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } -common-metrics = { path = "../common/metrics" } \ No newline at end of file +common-metrics = { path = "../common/metrics" } +common-kafka = { path = "../common/kafka" } diff --git a/rust/hook-janitor/src/config.rs b/rust/hook-janitor/src/config.rs index 28c34488d476b..9d7a76cc1b98e 100644 --- a/rust/hook-janitor/src/config.rs +++ b/rust/hook-janitor/src/config.rs @@ -1,6 +1,6 @@ use envconfig::Envconfig; -use hook_common::config::KafkaConfig; +use common_kafka::config::KafkaConfig; #[derive(Envconfig)] pub struct Config { diff --git a/rust/hook-janitor/src/main.rs b/rust/hook-janitor/src/main.rs index b7ea4db85ec1d..6ca27fa6e6d6f 100644 --- a/rust/hook-janitor/src/main.rs +++ b/rust/hook-janitor/src/main.rs @@ -9,8 +9,8 @@ use std::{str::FromStr, time::Duration}; use tokio::sync::Semaphore; use webhooks::WebhookCleaner; +use common_kafka::kafka_producer::create_kafka_producer; use common_metrics::setup_metrics_routes; -use hook_common::kafka_producer::create_kafka_producer; mod cleanup; mod config; diff --git a/rust/hook-janitor/src/webhooks.rs b/rust/hook-janitor/src/webhooks.rs index c523a4c59da55..c588c4e27c34c 100644 --- a/rust/hook-janitor/src/webhooks.rs +++ b/rust/hook-janitor/src/webhooks.rs @@ -3,10 +3,9 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use futures::future::join_all; use hook_common::webhook::WebhookJobError; use rdkafka::error::KafkaError; -use rdkafka::producer::{FutureProducer, FutureRecord}; +use rdkafka::producer::FutureProducer; use serde::Serialize; use serde_json::error::Error as SerdeError; use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions, Postgres}; @@ -17,10 +16,10 @@ use tracing::{debug, error, info}; use crate::cleanup::Cleaner; +use common_kafka::kafka_messages::app_metrics::{AppMetric, AppMetricCategory}; +use common_kafka::kafka_messages::app_metrics2::{self, AppMetric2}; +use common_kafka::kafka_producer::{send_iter_to_kafka, KafkaContext, KafkaProduceError}; use common_metrics::get_current_timestamp_seconds; -use hook_common::kafka_messages::app_metrics::{AppMetric, AppMetricCategory}; -use hook_common::kafka_messages::app_metrics2::{self, AppMetric2}; -use hook_common::kafka_producer::KafkaContext; #[derive(Error, Debug)] pub enum WebhookCleanerError { @@ -204,8 +203,6 @@ struct QueueDepth { count_retries: i64, } -// TODO: Extract this to a more generic function that produces any iterable that can be -// serialized, and returns more generic errors. async fn send_metrics_to_kafka( kafka_producer: &FutureProducer, topic: &str, @@ -214,46 +211,18 @@ async fn send_metrics_to_kafka( where T: Serialize, { - let mut payloads = Vec::new(); - - for metric in metrics { - let payload = serde_json::to_string(&metric) - .map_err(|e| WebhookCleanerError::SerializeRowsError { error: e })?; - payloads.push(payload); - } - - if payloads.is_empty() { - return Ok(()); - } - - let mut delivery_futures = Vec::new(); - - for payload in payloads { - match kafka_producer.send_result(FutureRecord { - topic, - payload: Some(&payload), - partition: None, - key: None::<&str>, - timestamp: None, - headers: None, - }) { - Ok(future) => delivery_futures.push(future), - Err((error, _)) => return Err(WebhookCleanerError::KafkaProduceError { error }), + match send_iter_to_kafka(kafka_producer, topic, metrics).await { + Ok(()) => Ok(()), + Err(KafkaProduceError::SerializationError { error }) => { + Err(WebhookCleanerError::SerializeRowsError { error }) } - } - - for result in join_all(delivery_futures).await { - match result { - Ok(Ok(_)) => {} - Ok(Err((error, _))) => return Err(WebhookCleanerError::KafkaProduceError { error }), - Err(_) => { - // Cancelled due to timeout while retrying - return Err(WebhookCleanerError::KafkaProduceCanceled); - } + Err(KafkaProduceError::KafkaProduceError { error }) => { + Err(WebhookCleanerError::KafkaProduceError { error }) + } + Err(KafkaProduceError::KafkaProduceCanceled) => { + Err(WebhookCleanerError::KafkaProduceCanceled) } } - - Ok(()) } // A simple wrapper type that ensures we don't use any old Transaction object when we need one @@ -658,12 +627,12 @@ impl Cleaner for WebhookCleaner { mod tests { use super::*; - use hook_common::kafka_messages::app_metrics::{ + use common_kafka::kafka_messages::app_metrics::{ Error as WebhookError, ErrorDetails, ErrorType, }; + use common_kafka::test::create_mock_kafka; use hook_common::pgqueue::PgQueueJob; use hook_common::pgqueue::{NewJob, PgQueue, PgTransactionBatch}; - use hook_common::test::create_mock_kafka; use hook_common::webhook::{HttpMethod, WebhookJobMetadata, WebhookJobParameters}; use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::types::{RDKafkaApiKey, RDKafkaRespErr}; diff --git a/rust/hook-worker/Cargo.toml b/rust/hook-worker/Cargo.toml index f7aaf59c75298..fdc6f150dfec9 100644 --- a/rust/hook-worker/Cargo.toml +++ b/rust/hook-worker/Cargo.toml @@ -26,7 +26,8 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true } url = { version = "2.2" } common-metrics = { path = "../common/metrics" } -common-dns = { path = "../common/dns" } +common-dns = { path = "../common/dns" } +common-kafka = { path = "../common/kafka" } [dev-dependencies] httpmock = { workspace = true } diff --git a/rust/hook-worker/src/config.rs b/rust/hook-worker/src/config.rs index 1fa6c04638698..4739df2fb3b38 100644 --- a/rust/hook-worker/src/config.rs +++ b/rust/hook-worker/src/config.rs @@ -3,7 +3,7 @@ use std::time; use envconfig::Envconfig; -use hook_common::config::KafkaConfig; +use common_kafka::config::KafkaConfig; #[derive(Envconfig, Clone)] pub struct Config { diff --git a/rust/hook-worker/src/main.rs b/rust/hook-worker/src/main.rs index 798586bc6ed5e..0aeae27e0a3a0 100644 --- a/rust/hook-worker/src/main.rs +++ b/rust/hook-worker/src/main.rs @@ -6,9 +6,9 @@ use hook_common::pgqueue::PgQueue; use hook_common::retry::RetryPolicy; use std::future::ready; +use common_kafka::kafka_producer::create_kafka_producer; use common_metrics::{serve, setup_metrics_routes}; use health::HealthRegistry; -use hook_common::kafka_producer::create_kafka_producer; use hook_worker::config::Config; use hook_worker::error::WorkerError; use hook_worker::worker::WebhookWorker; diff --git a/rust/hook-worker/src/worker.rs b/rust/hook-worker/src/worker.rs index bba15cd67c9a4..c68cdfc3f0cda 100644 --- a/rust/hook-worker/src/worker.rs +++ b/rust/hook-worker/src/worker.rs @@ -15,7 +15,7 @@ use tokio::sync; use tokio::time::{sleep, Duration}; use tracing::error; -use hook_common::kafka_producer::KafkaContext; +use common_kafka::kafka_producer::KafkaContext; use hook_common::pgqueue::PgTransactionBatch; use hook_common::{ pgqueue::{Job, PgQueue, PgQueueJob, PgTransactionJob, RetryError, RetryInvalidError}, @@ -793,9 +793,9 @@ mod tests { // Note we are ignoring some warnings in this module. // This is due to a long-standing cargo bug that reports imports and helper functions as unused. // See: https://github.com/rust-lang/rust/issues/46379. + use common_kafka::test::create_mock_kafka; use health::HealthRegistry; use hook_common::pgqueue::{DatabaseError, NewJob}; - use hook_common::test::create_mock_kafka; use hook_common::webhook::WebhookJobMetadata; use sqlx::PgPool;