diff --git a/Cargo.lock b/Cargo.lock index 584fdf6..cb9ac8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1280,6 +1280,7 @@ dependencies = [ "arrow", "datafusion", "pyo3", + "serde_json", "thiserror", ] diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 2b371e8..9c28e89 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -15,3 +15,4 @@ datafusion = { workspace = true } arrow = { workspace = true } thiserror = "1.0.63" pyo3 = { workspace = true, optional = true } +serde_json.workspace = true diff --git a/crates/common/src/error/mod.rs b/crates/common/src/error/mod.rs index 8fae8be..3849ee9 100644 --- a/crates/common/src/error/mod.rs +++ b/crates/common/src/error/mod.rs @@ -3,7 +3,7 @@ use thiserror::Error; use arrow::error::ArrowError; use datafusion::error::DataFusionError; - +use serde_json::Error as JsonError; #[cfg(feature = "python")] mod py_err; @@ -22,6 +22,8 @@ pub enum DenormalizedError { KafkaConfig(String), #[error("Arrow Error")] Arrow(#[from] ArrowError), + #[error("Json Error")] + Json(#[from] JsonError), #[error(transparent)] Other(#[from] anyhow::Error), } diff --git a/crates/core/src/datasource/kafka/kafka_config.rs b/crates/core/src/datasource/kafka/kafka_config.rs index c96a3bb..03ab399 100644 --- a/crates/core/src/datasource/kafka/kafka_config.rs +++ b/crates/core/src/datasource/kafka/kafka_config.rs @@ -6,6 +6,7 @@ use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit}; use datafusion::logical_expr::SortExpr; +use crate::formats::StreamEncoding; use crate::physical_plan::utils::time::TimestampUnit; use crate::utils::arrow_helpers::infer_arrow_schema_from_json_value; use denormalized_common::error::{DenormalizedError, Result}; @@ -307,28 +308,6 @@ impl KafkaTopicBuilder { } } -/// The data encoding for [`StreamTable`] -#[derive(Debug, Clone, Copy)] -pub enum StreamEncoding { - Avro, - Json, -} - -impl FromStr for StreamEncoding { - type Err = DenormalizedError; - - fn from_str(s: &str) -> std::result::Result { - match s.to_ascii_lowercase().as_str() { - "avro" => Ok(Self::Avro), - "json" => Ok(Self::Json), - _ => Err(Self::Err::KafkaConfig(format!( - "Unrecognised StreamEncoding {}", - s - ))), - } - } -} - fn get_topic_partition_count(bootstrap_servers: String, topic: String) -> Result { let mut client_config = ClientConfig::new(); client_config.set("bootstrap.servers", bootstrap_servers.to_string()); diff --git a/crates/core/src/datasource/kafka/kafka_stream_read.rs b/crates/core/src/datasource/kafka/kafka_stream_read.rs index 792c5b0..e82f530 100644 --- a/crates/core/src/datasource/kafka/kafka_stream_read.rs +++ b/crates/core/src/datasource/kafka/kafka_stream_read.rs @@ -9,14 +9,13 @@ use denormalized_orchestrator::channel_manager::{create_channel, get_sender}; use denormalized_orchestrator::orchestrator::{self, OrchestrationMessage}; use log::{debug, error}; use serde::{Deserialize, Serialize}; -use serde_json::Value; -//use tracing::{debug, error}; use crate::config_extensions::denormalized_config::DenormalizedConfig; +use crate::formats::decoders::json::JsonDecoder; +use crate::formats::decoders::Decoder; use crate::physical_plan::stream_table::PartitionStreamExt; use crate::physical_plan::utils::time::array_to_timestamp_array; use crate::state_backend::rocksdb_backend::get_global_rocksdb; -use crate::utils::arrow_helpers::json_records_to_arrow_record_batch; use arrow::compute::{max, min}; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; @@ -24,7 +23,7 @@ use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder; use datafusion::physical_plan::streaming::PartitionStream; use rdkafka::consumer::{Consumer, StreamConsumer}; -use rdkafka::{ClientConfig, Message, Timestamp, TopicPartitionList}; +use rdkafka::{ClientConfig, Message, TopicPartitionList}; use super::KafkaReadConfig; @@ -134,11 +133,11 @@ impl PartitionStream for KafkaStreamRead { let mut builder = RecordBatchReceiverStreamBuilder::new(self.config.schema.clone(), 1); let tx = builder.tx(); let canonical_schema = self.config.schema.clone(); - let json_schema = self.config.original_schema.clone(); + let arrow_schema = self.config.original_schema.clone(); let timestamp_column: String = self.config.timestamp_column.clone(); let timestamp_unit = self.config.timestamp_unit.clone(); let batch_timeout = Duration::from_millis(100); - let mut channel_tag = String::from(""); + let mut channel_tag: String = String::from(""); if orchestrator::SHOULD_CHECKPOINT { let node_id = self.exec_node_id.unwrap(); channel_tag = format!("{}_{}", node_id, partition_tag); @@ -151,6 +150,7 @@ impl PartitionStream for KafkaStreamRead { let msg = OrchestrationMessage::RegisterStream(channel_tag.clone()); orchestrator_sender.as_ref().unwrap().send(msg).unwrap(); } + let mut json_decoder: JsonDecoder = JsonDecoder::new(arrow_schema.clone()); loop { let mut last_offsets = HashMap::new(); if let Some(backend) = &state_backend { @@ -181,7 +181,6 @@ impl PartitionStream for KafkaStreamRead { } let mut offsets_read: HashMap = HashMap::new(); - let mut batch: Vec = Vec::new(); let start_time = datafusion::common::instant::Instant::now(); while start_time.elapsed() < batch_timeout { @@ -192,30 +191,9 @@ impl PartitionStream for KafkaStreamRead { .await { Ok(Ok(m)) => { - let timestamp = match m.timestamp() { - Timestamp::NotAvailable => -1_i64, - Timestamp::CreateTime(ts) => ts, - Timestamp::LogAppendTime(ts) => ts, - }; - let key = m.key(); - let payload = m.payload().expect("Message payload is empty"); - let mut deserialized_record: HashMap = - serde_json::from_slice(payload).unwrap(); - deserialized_record - .insert("kafka_timestamp".to_string(), Value::from(timestamp)); - if let Some(key) = key { - deserialized_record.insert( - "kafka_key".to_string(), - Value::from(String::from_utf8_lossy(key)), - ); - } else { - deserialized_record - .insert("kafka_key".to_string(), Value::from(String::from(""))); - } - let new_payload = serde_json::to_value(deserialized_record).unwrap(); + json_decoder.push_to_buffer(payload.to_owned()); offsets_read.insert(m.partition(), m.offset()); - batch.push(new_payload); } Ok(Err(err)) => { error!("Error reading from Kafka {:?}", err); @@ -228,12 +206,8 @@ impl PartitionStream for KafkaStreamRead { } } - //debug!("Batch size {}", batch.len()); - - if !batch.is_empty() { - let record_batch: RecordBatch = - json_records_to_arrow_record_batch(batch, json_schema.clone()); - + if !offsets_read.is_empty() { + let record_batch = json_decoder.to_record_batch().unwrap(); let ts_column = record_batch .column_by_name(timestamp_column.as_str()) .map(|ts_col| { diff --git a/crates/core/src/datasource/kafka/mod.rs b/crates/core/src/datasource/kafka/mod.rs index b5fab34..b3bd7f5 100644 --- a/crates/core/src/datasource/kafka/mod.rs +++ b/crates/core/src/datasource/kafka/mod.rs @@ -3,9 +3,7 @@ pub mod kafka_stream_read; pub mod topic_reader; pub mod topic_writer; -pub use kafka_config::{ - ConnectionOpts, KafkaReadConfig, KafkaTopicBuilder, KafkaWriteConfig, StreamEncoding, -}; +pub use kafka_config::{ConnectionOpts, KafkaReadConfig, KafkaTopicBuilder, KafkaWriteConfig}; pub use kafka_stream_read::KafkaStreamRead; pub use topic_reader::TopicReader; pub use topic_writer::TopicWriter; diff --git a/crates/core/src/formats/decoders/json.rs b/crates/core/src/formats/decoders/json.rs new file mode 100644 index 0000000..a240d37 --- /dev/null +++ b/crates/core/src/formats/decoders/json.rs @@ -0,0 +1,148 @@ +use std::sync::Arc; + +use arrow_schema::Schema; +use serde_json::Value; + +use crate::utils::arrow_helpers::json_records_to_arrow_record_batch; + +use super::Decoder; + +pub struct JsonDecoder { + schema: Arc, + cache: Vec>, +} + +impl JsonDecoder { + pub fn new(schema: Arc) -> Self { + JsonDecoder { + schema: schema.clone(), + cache: Vec::new(), + } + } +} + +impl Decoder for JsonDecoder { + fn to_record_batch( + &mut self, + ) -> Result { + let mut combined_json = Vec::new(); + combined_json.push(b'['); + for (index, row) in self.cache.iter().enumerate() { + if index > 0 { + combined_json.push(b','); + } + combined_json.extend_from_slice(row.as_slice()); + } + combined_json.push(b']'); + self.cache.clear(); + let result: Vec = serde_json::from_slice(&combined_json)?; + Ok(json_records_to_arrow_record_batch( + result, + self.schema.clone(), + )) + } + + fn push_to_buffer(&mut self, bytes: Vec) { + self.cache.push(bytes); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::{Int64Array, StringArray}; + use arrow_schema::{DataType, Field}; + use std::sync::Arc; + + fn create_test_schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + ])) + } + + fn create_test_json(id: i64, name: &str) -> Vec { + format!(r#"{{"id": {}, "name": "{}"}}"#, id, name).into_bytes() + } + + #[test] + fn test_json_decoder_new() { + let schema = create_test_schema(); + let decoder = JsonDecoder::new(schema.clone()); + assert_eq!(decoder.schema, schema); + assert!(decoder.cache.is_empty()); + } + + #[test] + fn test_push_to_buffer() { + let schema = create_test_schema(); + let mut decoder = JsonDecoder::new(schema); + let json1 = create_test_json(1, "Alice"); + let json2 = create_test_json(2, "Bob"); + + decoder.push_to_buffer(json1.clone()); + decoder.push_to_buffer(json2.clone()); + + assert_eq!(decoder.cache, vec![json1, json2]); + } + + #[test] + fn test_to_record_batch() -> Result<(), denormalized_common::DenormalizedError> { + let schema = create_test_schema(); + let mut decoder = JsonDecoder::new(schema.clone()); + + decoder.push_to_buffer(create_test_json(1, "Alice")); + decoder.push_to_buffer(create_test_json(2, "Bob")); + decoder.push_to_buffer(create_test_json(3, "Charlie")); + + let record_batch = decoder.to_record_batch()?; + + assert_eq!(record_batch.num_columns(), 2); + assert_eq!(record_batch.num_rows(), 3); + + let id_array = record_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let name_array = record_batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(id_array.value(0), 1); + assert_eq!(id_array.value(1), 2); + assert_eq!(id_array.value(2), 3); + + assert_eq!(name_array.value(0), "Alice"); + assert_eq!(name_array.value(1), "Bob"); + assert_eq!(name_array.value(2), "Charlie"); + + Ok(()) + } + + #[test] + fn test_to_record_batch_empty() -> Result<(), denormalized_common::DenormalizedError> { + let schema = create_test_schema(); + let mut decoder = JsonDecoder::new(schema.clone()); + + let record_batch = decoder.to_record_batch()?; + + assert_eq!(record_batch.num_columns(), 2); + assert_eq!(record_batch.num_rows(), 0); + + Ok(()) + } + + #[test] + fn test_to_record_batch_invalid_json() { + let schema = create_test_schema(); + let mut decoder = JsonDecoder::new(schema.clone()); + + decoder.push_to_buffer(b"{invalid_json}".to_vec()); + + let result = decoder.to_record_batch(); + assert!(result.is_err()); + } +} diff --git a/crates/core/src/formats/decoders/mod.rs b/crates/core/src/formats/decoders/mod.rs new file mode 100644 index 0000000..ae56178 --- /dev/null +++ b/crates/core/src/formats/decoders/mod.rs @@ -0,0 +1,10 @@ +use arrow_array::RecordBatch; +use denormalized_common::DenormalizedError; + +pub trait Decoder { + fn push_to_buffer(&mut self, bytes: Vec); + + fn to_record_batch(&mut self) -> Result; +} + +pub mod json; diff --git a/crates/core/src/formats/mod.rs b/crates/core/src/formats/mod.rs new file mode 100644 index 0000000..947985d --- /dev/null +++ b/crates/core/src/formats/mod.rs @@ -0,0 +1,26 @@ +use std::str::FromStr; + +use denormalized_common::DenormalizedError; + +#[derive(Debug, Clone, Copy)] +pub enum StreamEncoding { + Avro, + Json, +} + +impl FromStr for StreamEncoding { + type Err = DenormalizedError; + + fn from_str(s: &str) -> std::result::Result { + match s.to_ascii_lowercase().as_str() { + "avro" => Ok(Self::Avro), + "json" => Ok(Self::Json), + _ => Err(Self::Err::KafkaConfig(format!( + "Unrecognised StreamEncoding {}", + s + ))), + } + } +} + +pub mod decoders; diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index b915289..2fea016 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -3,6 +3,7 @@ pub mod config_extensions; pub mod context; pub mod datasource; pub mod datastream; +pub mod formats; pub mod logical_plan; pub mod physical_optimizer; pub mod physical_plan; diff --git a/crates/core/src/utils/arrow_helpers.rs b/crates/core/src/utils/arrow_helpers.rs index aca7290..5fbc075 100644 --- a/crates/core/src/utils/arrow_helpers.rs +++ b/crates/core/src/utils/arrow_helpers.rs @@ -42,7 +42,7 @@ pub fn json_records_to_arrow_record_batch( if records.is_empty() { return RecordBatch::new_empty(schema); } - let string_stream: Vec = records.iter().map(|r| r.to_string()).collect(); + let string_stream: Vec = records.iter().map(|r: &JValue| r.to_string()).collect(); let cursor: Cursor = Cursor::new(string_stream.join("\n")); let mut reader = ReaderBuilder::new(schema).build(cursor).unwrap();