diff --git a/.gitignore b/.gitignore index 5bbc1b19..229ccfeb 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,5 @@ parseable parseable_* parseable-env-secret cache +.idea diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 00000000..13566b81 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/Cargo.lock b/Cargo.lock index b5dc1dd5..e298ab93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2634,6 +2634,18 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libz-sys" +version = "1.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2d16453e800a8cf6dd2fc3eb4bc99b786a9b90c663b8559a5b1a041bf89e472" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.1.4" @@ -2924,6 +2936,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799" +dependencies = [ + "proc-macro-crate 1.3.1", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "num_threads" version = "0.1.7" @@ -3146,6 +3179,7 @@ dependencies = [ "prost", "prost-build", "rand", + "rdkafka", "regex", "relative-path", "reqwest 0.11.27", @@ -3329,13 +3363,23 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "proc-macro-crate" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" +dependencies = [ + "once_cell", + "toml_edit 0.19.15", +] + [[package]] name = "proc-macro-crate" version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecf48c7ca261d60b74ab1a7b20da18bede46776b2e55535cb958eb595c5fa7b" dependencies = [ - "toml_edit", + "toml_edit 0.22.22", ] [[package]] @@ -3614,6 +3658,36 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rdkafka" +version = "0.36.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1beea247b9a7600a81d4cc33f659ce1a77e1988323d7d2809c7ed1c21f4c316d" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.7.0+2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55e0d2f9ba6253f6ec72385e453294f8618e9e15c2c6aba2a5c01ccf9622d615" +dependencies = [ + "libc", + "libz-sys", + "num_enum", + "pkg-config", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -3789,7 +3863,7 @@ checksum = "825ea780781b15345a146be27eaefb05085e337e869bff01b4306a4fd4a9ad5a" dependencies = [ "cfg-if", "glob", - "proc-macro-crate", + "proc-macro-crate 3.2.0", "proc-macro2", "quote", "regex", @@ -4308,6 +4382,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ "proc-macro2", + "quote", "unicode-ident", ] @@ -4582,7 +4657,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit", + "toml_edit 0.22.22", ] [[package]] @@ -4594,6 +4669,17 @@ dependencies = [ "serde", ] +[[package]] +name = "toml_edit" +version = "0.19.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" +dependencies = [ + "indexmap 2.5.0", + "toml_datetime", + "winnow 0.5.40", +] + [[package]] name = "toml_edit" version = "0.22.22" @@ -4604,7 +4690,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "winnow", + "winnow 0.6.20", ] [[package]] @@ -4922,6 +5008,12 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vergen" version = "8.3.1" @@ -5431,6 +5523,15 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876" +dependencies = [ + "memchr", +] + [[package]] name = "winnow" version = "0.6.20" diff --git a/server/Cargo.toml b/server/Cargo.toml index dcf28a3e..9498d969 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -106,6 +106,7 @@ path-clean = "1.0.1" prost = "0.13.3" prometheus-parse = "0.2.5" sha2 = "0.10.8" +rdkafka = "0.36.2" [build-dependencies] cargo_toml = "0.20.1" diff --git a/server/src/kafka.rs b/server/src/kafka.rs new file mode 100644 index 00000000..483c7ef7 --- /dev/null +++ b/server/src/kafka.rs @@ -0,0 +1,231 @@ +use arrow_schema::Field; +use chrono::Utc; +use futures_util::StreamExt; +use rdkafka::config::ClientConfig; +use rdkafka::consumer::stream_consumer::StreamConsumer; +use rdkafka::consumer::Consumer; +use rdkafka::error::{KafkaError as NativeKafkaError, RDKafkaError}; +use rdkafka::message::BorrowedMessage; +use rdkafka::util::Timeout; +use rdkafka::{Message, TopicPartitionList}; +use std::env::VarError; +use std::fmt::Display; +use std::num::ParseIntError; +use std::sync::Arc; +use std::{collections::HashMap, env, fmt::Debug, str::FromStr, time::Duration}; +use tokio::task::{self, JoinHandle}; + +use crate::{ + event::{ + self, + error::EventError, + format::{self, EventFormat}, + }, + handlers::http::ingest::{create_stream_if_not_exists, PostError}, + metadata::{error::stream_info::MetadataError, STREAM_INFO}, + storage::StreamType, +}; + +enum SslProtocol { + Plaintext, + Ssl, + SaslPlaintext, + SaslSsl, +} +impl Display for SslProtocol { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + SslProtocol::Plaintext => "plaintext", + SslProtocol::Ssl => "ssl", + SslProtocol::SaslPlaintext => "sasl_plaintext", + SslProtocol::SaslSsl => "sasl_ssl", + }) + } +} +impl FromStr for SslProtocol { + type Err = KafkaError; + fn from_str(s: &str) -> Result { + match s.to_ascii_lowercase().as_str() { + "plaintext" => Ok(SslProtocol::Plaintext), + "ssl" => Ok(SslProtocol::Ssl), + "sasl_plaintext" => Ok(SslProtocol::SaslPlaintext), + "sasl_ssl" => Ok(SslProtocol::SaslSsl), + _ => Err(KafkaError::InvalidSslProtocolError(s.to_string())), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum KafkaError { + #[error("Error loading environment variable {0}")] + NoVarError(&'static str), + + #[error("Kafka error {0}")] + NativeError(#[from] NativeKafkaError), + #[error("RDKafka error {0}")] + RDKError(#[from] RDKafkaError), + + #[error("Error parsing int {1} for environment variable {0}")] + ParseIntError(&'static str, ParseIntError), + #[error("Error parsing duration int {1} for environment variable {0}")] + ParseDurationError(&'static str, ParseIntError), + + #[error("Stream not found: #{0}")] + StreamNotFound(String), + #[error("Post error: #{0}")] + PostError(#[from] PostError), + #[error("Metadata error: #{0}")] + MetadataError(#[from] MetadataError), + #[error("Event error: #{0}")] + EventError(#[from] EventError), + #[error("JSON error: #{0}")] + JsonError(#[from] serde_json::Error), + #[error("Invalid group offset storage: #{0}")] + InvalidGroupOffsetStorage(String), + + #[error("Invalid SSL protocol: #{0}")] + InvalidSslProtocolError(String), + #[error("Invalid unicode for environment variable {0}")] + EnvNotUnicode(&'static str), +} + +fn load_env_or_err(key: &'static str) -> Result { + env::var(key).map_err(|_| KafkaError::NoVarError(key)) +} +fn parse_auto_env(key: &'static str) -> Result, ::Err> +where + T: FromStr, +{ + Ok(if let Ok(val) = env::var(key) { + Some(val.parse::()?) + } else { + None + }) +} +fn handle_duration_env_prefix(key: &'static str) -> Result, ParseIntError> { + if let Ok(raw_secs) = env::var(format!("{key}_S")) { + Ok(Some(Duration::from_secs(u64::from_str(&raw_secs)?))) + } else if let Ok(raw_secs) = env::var(format!("{key}_M")) { + Ok(Some(Duration::from_secs(u64::from_str(&raw_secs)? * 60))) + } else { + Ok(None) + } +} +fn parse_i32_env(key: &'static str) -> Result, KafkaError> { + parse_auto_env::(key).map_err(|raw| KafkaError::ParseIntError(key, raw)) +} + +fn parse_duration_env_prefixed(key_prefix: &'static str) -> Result, KafkaError> { + handle_duration_env_prefix(key_prefix) + .map_err(|raw| KafkaError::ParseDurationError(key_prefix, raw)) +} + +fn get_flag_env_val(key: &'static str) -> Result, KafkaError> { + let raw = env::var(key); + match raw { + Ok(val) => Ok(Some(val != "0" && val != "false")), + Err(VarError::NotPresent) => Ok(None), + Err(VarError::NotUnicode(_)) => Err(KafkaError::EnvNotUnicode(key)), + } +} + +fn setup_consumer() -> Result { + let hosts = load_env_or_err("KAFKA_HOSTS")?; + let topic = load_env_or_err("KAFKA_TOPIC")?; + + let mut conf = ClientConfig::new(); + conf.set("bootstrap.servers", &hosts); + + if let Ok(val) = env::var("KAFKA_CLIENT_ID") { + conf.set("client.id", &val); + } + + if let Some(val) = get_flag_env_val("a")? { + conf.set("api.version.request", val.to_string()); + } + if let Ok(val) = env::var("KAFKA_GROUP") { + conf.set("group.id", &val); + } + + if let Ok(val) = env::var("KAFKA_SECURITY_PROTOCOL") { + let mapped: SslProtocol = val.parse()?; + conf.set("security.protocol", &mapped.to_string()); + } + let consumer: StreamConsumer = conf.create()?; + + if let Ok(vals_raw) = env::var("KAFKA_PARTITIONS") { + let vals = vals_raw + .split(',') + .map(i32::from_str) + .collect::, ParseIntError>>() + .map_err(|raw| KafkaError::ParseIntError("KAFKA_PARTITIONS", raw))?; + + let mut parts = TopicPartitionList::new(); + for val in vals { + parts.add_partition(&topic, val); + } + consumer.seek_partitions(parts, Timeout::Never)?; + } + Ok(consumer) +} + +fn resolve_schema(stream_name: &str) -> Result>, KafkaError> { + let hash_map = STREAM_INFO.read().unwrap(); + let raw = hash_map + .get(stream_name) + .ok_or_else(|| KafkaError::StreamNotFound(stream_name.to_owned()))?; + Ok(raw.schema.clone()) +} +async fn ingest_message<'a>(stream_name: &str, msg: BorrowedMessage<'a>) -> Result<(), KafkaError> { + log::debug!("{}: Message: {:?}", stream_name, msg); + if let Some(payload) = msg.payload() { + let schema = resolve_schema(stream_name)?; + let event = format::json::Event { + data: serde_json::from_slice(payload)?, + tags: String::default(), + metadata: String::default(), + }; + log::debug!("Generated event: {:?}", event.data); + let (rb, is_first) = event.into_recordbatch(schema, None, None).unwrap(); + + event::Event { + rb, + stream_name: stream_name.to_string(), + origin_format: "json", + origin_size: payload.len() as u64, + is_first_event: is_first, + parsed_timestamp: Utc::now().naive_utc(), + time_partition: None, + custom_partition_values: HashMap::new(), + stream_type: StreamType::UserDefined, + } + .process() + .await?; + } else { + log::debug!("{} No payload for stream", stream_name); + } + Ok(()) +} + +pub async fn setup_integration() -> Result, KafkaError> { + let my_res = if let Ok(stream_name) = env::var("KAFKA_TOPIC") { + log::info!("Setup kafka integration for {stream_name}"); + create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?; + + let res = task::spawn(async move { + let consumer = setup_consumer().unwrap(); + let mut stream = consumer.stream(); + loop { + while let Some(curr) = stream.next().await { + let msg = curr.unwrap(); + ingest_message(&stream_name, msg).await.unwrap(); + } + } + }); + log::info!("Done Setup kafka integration"); + res + } else { + task::spawn_blocking(|| {}) + }; + Ok(my_res) +} diff --git a/server/src/main.rs b/server/src/main.rs index fca2ca30..0d47cb29 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -25,6 +25,7 @@ mod cli; mod event; mod handlers; mod hottier; +mod kafka; mod livetail; mod localcache; mod metadata; @@ -58,6 +59,7 @@ pub const STORAGE_UPLOAD_INTERVAL: u32 = 60; async fn main() -> anyhow::Result<()> { env_logger::init(); + kafka::setup_integration().await?; // these are empty ptrs so mem footprint should be minimal let server: Arc = match CONFIG.parseable.mode { Mode::Query => Arc::new(QueryServer), @@ -68,6 +70,5 @@ async fn main() -> anyhow::Result<()> { }; server.init().await?; - Ok(()) }